1#[cfg(feature = "amqp")]
7pub mod amqp;
8#[cfg(feature = "aws")]
9pub mod aws;
10pub mod fanout;
11pub mod file;
12#[cfg(any(feature = "http-client", feature = "http-server"))]
13pub mod http;
14#[cfg(feature = "ibm-mq")]
15pub mod ibm_mq;
16#[cfg(feature = "kafka")]
17pub mod kafka;
18pub mod memory;
19#[cfg(feature = "mongodb")]
20pub mod mongodb;
21#[cfg(feature = "mqtt")]
22pub mod mqtt;
23#[cfg(feature = "nats")]
24pub mod nats;
25pub mod null;
26pub mod response;
27pub mod static_endpoint;
28pub mod switch;
29#[cfg(feature = "zeromq")]
30pub mod zeromq;
31use crate::endpoints::memory::{get_or_create_channel, MemoryChannel};
32use crate::middleware::apply_middlewares_to_consumer;
33use crate::models::{Endpoint, EndpointType, MemoryConfig, Middleware, ResponseConfig};
34use crate::route::get_endpoint_factory;
35use crate::traits::{BoxFuture, MessageConsumer, MessagePublisher};
36use anyhow::{anyhow, Result};
37use std::sync::Arc;
38
39impl Endpoint {
40 pub fn new(endpoint_type: EndpointType) -> Self {
41 Self {
42 middlewares: Vec::new(),
43 endpoint_type,
44 handler: None,
45 }
46 }
47 pub fn new_memory(topic: &str, capacity: usize) -> Self {
56 Self::new(EndpointType::Memory(MemoryConfig::new(
57 topic,
58 Some(capacity),
59 )))
60 }
61 pub fn new_response() -> Self {
62 Self::new(EndpointType::Response(ResponseConfig::default()))
63 }
64 pub fn add_middleware(mut self, middleware: Middleware) -> Self {
65 self.middlewares.push(middleware);
66 self
67 }
68 pub fn add_middlewares(mut self, mut middlewares: Vec<Middleware>) -> Self {
69 self.middlewares.append(&mut middlewares);
70 self
71 }
72 pub fn channel(&self) -> anyhow::Result<MemoryChannel> {
78 match &self.endpoint_type {
79 EndpointType::Memory(cfg) => Ok(get_or_create_channel(cfg)),
80 _ => Err(anyhow::anyhow!("channel() called on non-memory Endpoint")),
81 }
82 }
83 pub fn null() -> Self {
84 Self::new(EndpointType::Null)
85 }
86
87 pub async fn create_consumer(
88 &self,
89 route_name: &str,
90 ) -> anyhow::Result<Box<dyn crate::traits::MessageConsumer>> {
91 crate::endpoints::create_consumer_from_route(route_name, self).await
92 }
93
94 pub async fn create_publisher(&self, _route_name: &str) -> anyhow::Result<crate::Publisher> {
95 crate::Publisher::new(self.clone()).await
96 }
97
98 pub fn check_consumer(
99 &self,
100 route_name: &str,
101 allowed_endpoints: Option<&[&str]>,
102 ) -> anyhow::Result<()> {
103 crate::endpoints::check_consumer(route_name, self, allowed_endpoints)
104 }
105
106 pub fn check_publisher(
107 &self,
108 route_name: &str,
109 allowed_endpoints: Option<&[&str]>,
110 ) -> anyhow::Result<()> {
111 crate::endpoints::check_publisher(route_name, self, allowed_endpoints)
112 }
113}
114
115pub fn check_consumer(
117 route_name: &str,
118 endpoint: &Endpoint,
119 allowed_types: Option<&[&str]>,
120) -> Result<()> {
121 if endpoint.handler.is_some() {
122 tracing::warn!(
123 route = route_name,
124 "Endpoint 'handler' is set on an input endpoint. Handlers are currently only supported on output endpoints (publishers) and will be ignored here."
125 );
126 }
127 if let Some(allowed) = allowed_types {
128 if !endpoint.endpoint_type.is_core() {
129 let name = endpoint.endpoint_type.name();
130 if !allowed.contains(&name) {
131 return Err(anyhow!(
132 "[route:{}] Endpoint type '{}' is not allowed by policy",
133 route_name,
134 name
135 ));
136 }
137 }
138 }
139 match &endpoint.endpoint_type {
140 #[cfg(feature = "aws")]
141 EndpointType::Aws(_) => Ok(()),
142 #[cfg(feature = "kafka")]
143 EndpointType::Kafka(_) => Ok(()),
144 #[cfg(feature = "nats")]
145 EndpointType::Nats(cfg) => {
146 if cfg.stream.is_none() {
147 return Err(anyhow!(
148 "[route:{}] NATS consumer must specify a 'stream'",
149 route_name
150 ));
151 }
152 Ok(())
153 }
154 #[cfg(feature = "amqp")]
155 EndpointType::Amqp(_) => Ok(()),
156 #[cfg(feature = "mqtt")]
157 EndpointType::Mqtt(_) => Ok(()),
158 #[cfg(feature = "zeromq")]
159 EndpointType::ZeroMq(_) => Ok(()),
160 #[cfg(feature = "ibm-mq")]
161 EndpointType::IbmMq(_) => Ok(()),
162 #[cfg(feature = "mongodb")]
163 EndpointType::MongoDb(_) => Ok(()),
164 #[cfg(any(feature = "http-client", feature = "http-server"))]
165 EndpointType::Http(_) => {
166 #[cfg(not(feature = "http-server"))]
167 {
168 Err(anyhow!("HTTP consumer requires the 'http-server' feature"))
169 }
170 #[cfg(feature = "http-server")]
171 Ok(())
172 }
173 EndpointType::Static(_) => Ok(()),
174 EndpointType::Memory(_) => Ok(()),
175 EndpointType::File(_) => Ok(()),
176 EndpointType::Custom { .. } => Ok(()),
177 EndpointType::Switch(_) => Err(anyhow!(
178 "[route:{}] Switch endpoint is only supported as an output",
179 route_name
180 )),
181 #[allow(unreachable_patterns)]
182 _ => {
183 if let Some(allowed) = allowed_types {
184 let name = endpoint.endpoint_type.name();
185 if allowed.contains(&name) {
186 return Ok(());
187 }
188 }
189 Err(anyhow!(
190 "[route:{}] Unsupported consumer endpoint type",
191 route_name
192 ))
193 }
194 }
195}
196
197pub async fn create_consumer_from_route(
199 route_name: &str,
200 endpoint: &Endpoint,
201) -> Result<Box<dyn MessageConsumer>> {
202 check_consumer(route_name, endpoint, None)?;
203 let consumer = create_base_consumer(route_name, endpoint).await?;
204 apply_middlewares_to_consumer(consumer, endpoint, route_name).await
205}
206
207async fn create_base_consumer(
208 route_name: &str,
209 endpoint: &Endpoint,
210) -> Result<Box<dyn MessageConsumer>> {
211 fn boxed<T: MessageConsumer + 'static>(c: T) -> Box<dyn MessageConsumer> {
213 Box::new(c)
214 }
215
216 match &endpoint.endpoint_type {
217 #[cfg(feature = "aws")]
218 EndpointType::Aws(cfg) => Ok(boxed(aws::AwsConsumer::new(cfg).await?)),
219 #[cfg(feature = "kafka")]
220 EndpointType::Kafka(cfg) => {
221 let mut config = cfg.clone();
222 if config.topic.is_none() {
223 config.topic = Some(route_name.to_string());
224 }
225 Ok(boxed(kafka::KafkaConsumer::new(&config).await?))
226 }
227 #[cfg(feature = "nats")]
228 EndpointType::Nats(cfg) => {
229 let mut config = cfg.clone();
230 if config.subject.is_none() {
231 config.subject = Some(route_name.to_string());
232 }
233 Ok(boxed(nats::NatsConsumer::new(&config).await?))
234 }
235 #[cfg(feature = "amqp")]
236 EndpointType::Amqp(cfg) => {
237 let mut config = cfg.clone();
238 if config.queue.is_none() {
239 config.queue = Some(route_name.to_string());
240 }
241 Ok(boxed(amqp::AmqpConsumer::new(&config).await?))
242 }
243 #[cfg(feature = "mqtt")]
244 EndpointType::Mqtt(cfg) => {
245 let mut config = cfg.clone();
246 if config.topic.is_none() {
247 config.topic = Some(route_name.to_string());
248 }
249 if config.client_id.is_none() && !config.clean_session {
250 config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
252 }
253 Ok(boxed(mqtt::MqttConsumer::new(cfg).await?))
254 }
255 #[cfg(feature = "ibm-mq")]
256 EndpointType::IbmMq(cfg) => {
257 let mut config = cfg.clone();
258 if config.queue.is_none() && config.topic.is_none() {
259 config.queue = Some(route_name.to_string());
260 }
261 Ok(boxed(ibm_mq::IbmMqConsumer::new(&config).await?))
262 }
263 #[cfg(feature = "zeromq")]
264 EndpointType::ZeroMq(cfg) => Ok(boxed(zeromq::ZeroMqConsumer::new(cfg).await?)),
265 EndpointType::File(cfg) => Ok(boxed(file::FileConsumer::new(cfg).await?)),
266 #[cfg(any(feature = "http-client", feature = "http-server"))]
267 EndpointType::Http(cfg) => {
268 #[cfg(feature = "http-server")]
269 {
270 Ok(boxed(http::HttpConsumer::new(cfg).await?))
271 }
272 #[cfg(not(feature = "http-server"))]
273 {
274 Err(anyhow!("HTTP consumer requires the 'http-server' feature"))
275 }
276 }
277 EndpointType::Static(cfg) => Ok(boxed(static_endpoint::StaticRequestConsumer::new(cfg)?)),
278 EndpointType::Memory(cfg) => Ok(boxed(memory::MemoryConsumer::new(cfg)?)),
279 #[cfg(feature = "mongodb")]
280 EndpointType::MongoDb(cfg) => {
281 let mut config = cfg.clone();
282 if config.collection.is_none() {
283 config.collection = Some(route_name.to_string());
284 }
285 if config.change_stream {
286 if config.ttl_seconds.is_none() {
287 config.ttl_seconds = Some(86400); }
289 Ok(boxed(mongodb::MongoDbSubscriber::new(&config).await?))
290 } else {
291 Ok(boxed(mongodb::MongoDbConsumer::new(&config).await?))
292 }
293 }
294 EndpointType::Custom { name, config } => {
295 let factory = get_endpoint_factory(name)
296 .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
297 factory.create_consumer(route_name, config).await
298 }
299 EndpointType::Switch(_) => Err(anyhow!(
300 "[route:{}] Switch endpoint is only supported as an output",
301 route_name
302 )),
303 #[allow(unreachable_patterns)]
304 _ => Err(anyhow!(
305 "[route:{}] Unsupported consumer endpoint type",
306 route_name
307 )),
308 }
309}
310
311pub fn check_publisher(
313 route_name: &str,
314 endpoint: &Endpoint,
315 allowed_types: Option<&[&str]>,
316) -> Result<()> {
317 check_publisher_recursive(route_name, endpoint, 0, allowed_types)
318}
319
320fn check_publisher_recursive(
321 route_name: &str,
322 endpoint: &Endpoint,
323 depth: usize,
324 allowed_types: Option<&[&str]>,
325) -> Result<()> {
326 if let Some(allowed) = allowed_types {
327 if !endpoint.endpoint_type.is_core() {
328 let name = endpoint.endpoint_type.name();
329 if !allowed.contains(&name) {
330 return Err(anyhow!(
331 "[route:{}] Endpoint type '{}' is not allowed by policy",
332 route_name,
333 name
334 ));
335 }
336 }
337 }
338 const MAX_DEPTH: usize = 16;
339 if depth > MAX_DEPTH {
340 return Err(anyhow!(
341 "Fanout recursion depth exceeded limit of {}",
342 MAX_DEPTH
343 ));
344 }
345 match &endpoint.endpoint_type {
346 #[cfg(feature = "aws")]
347 EndpointType::Aws(_) => Ok(()),
348 #[cfg(feature = "kafka")]
349 EndpointType::Kafka(_) => Ok(()),
350 #[cfg(feature = "nats")]
351 EndpointType::Nats(_) => Ok(()),
352 #[cfg(feature = "amqp")]
353 EndpointType::Amqp(_) => Ok(()),
354 #[cfg(feature = "mqtt")]
355 EndpointType::Mqtt(_) => Ok(()),
356 #[cfg(feature = "zeromq")]
357 EndpointType::ZeroMq(_) => Ok(()),
358 #[cfg(any(feature = "http-client", feature = "http-server"))]
359 EndpointType::Http(_) => {
360 #[cfg(not(feature = "reqwest"))]
361 {
362 Err(anyhow!("HTTP publisher requires the 'reqwest' feature"))
363 }
364 #[cfg(feature = "reqwest")]
365 Ok(())
366 }
367 #[cfg(feature = "mongodb")]
368 EndpointType::MongoDb(_) => Ok(()),
369 EndpointType::File(_) => Ok(()),
370 EndpointType::Static(_) => Ok(()),
371 EndpointType::Memory(_) => Ok(()),
372 EndpointType::Null => Ok(()),
373 EndpointType::Fanout(endpoints) => {
374 for endpoint in endpoints {
375 check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
376 }
377 Ok(())
378 }
379 EndpointType::Switch(cfg) => {
380 for endpoint in cfg.cases.values() {
381 check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
382 }
383 if let Some(endpoint) = &cfg.default {
384 check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
385 }
386 Ok(())
387 }
388 EndpointType::Response(_) => Ok(()),
389 EndpointType::Custom { .. } => Ok(()),
390 #[allow(unreachable_patterns)]
391 _ => {
392 if let Some(allowed) = allowed_types {
393 let name = endpoint.endpoint_type.name();
394 if allowed.contains(&name) {
395 return Ok(());
396 }
397 }
398 Err(anyhow!(
399 "[route:{}] Unsupported publisher endpoint type",
400 route_name
401 ))
402 }
403 }
404}
405
406pub async fn create_publisher_from_route(
408 route_name: &str,
409 endpoint: &Endpoint,
410) -> Result<Arc<dyn MessagePublisher>> {
411 check_publisher(route_name, endpoint, None)?;
412 create_publisher_with_depth(route_name, endpoint, 0).await
413}
414
415fn create_publisher_with_depth<'a>(
416 route_name: &'a str,
417 endpoint: &'a Endpoint,
418 depth: usize,
419) -> BoxFuture<'a, Result<Arc<dyn MessagePublisher>>> {
420 Box::pin(async move {
421 const MAX_DEPTH: usize = 16;
422 if depth > MAX_DEPTH {
423 return Err(anyhow!(
424 "Fanout recursion depth exceeded limit of {}",
425 MAX_DEPTH
426 ));
427 }
428 let mut publisher =
429 create_base_publisher(route_name, &endpoint.endpoint_type, depth).await?;
430 if let Some(handler) = &endpoint.handler {
431 publisher = Box::new(crate::command_handler::CommandPublisher::new(
432 publisher,
433 handler.clone(),
434 ));
435 }
436 crate::middleware::apply_middlewares_to_publisher(publisher, endpoint, route_name).await
437 })
438}
439
440async fn create_base_publisher(
441 route_name: &str,
442 endpoint_type: &EndpointType,
443 depth: usize,
444) -> Result<Box<dyn MessagePublisher>> {
445 let publisher = match endpoint_type {
446 #[cfg(feature = "aws")]
447 EndpointType::Aws(cfg) => {
448 Ok(Box::new(aws::AwsPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
449 }
450 #[cfg(feature = "kafka")]
451 EndpointType::Kafka(cfg) => {
452 let mut config = cfg.clone();
453 if config.topic.is_none() {
454 config.topic = Some(route_name.to_string());
455 }
456 Ok(Box::new(kafka::KafkaPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
457 }
458 #[cfg(feature = "nats")]
459 EndpointType::Nats(cfg) => {
460 let mut config = cfg.clone();
461 if config.subject.is_none() {
462 config.subject = Some(route_name.to_string());
463 }
464 Ok(Box::new(nats::NatsPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
465 }
466 #[cfg(feature = "amqp")]
467 EndpointType::Amqp(cfg) => {
468 let mut config = cfg.clone();
469 if config.queue.is_none() {
470 config.queue = Some(route_name.to_string());
471 }
472 Ok(Box::new(amqp::AmqpPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
473 }
474 #[cfg(feature = "mqtt")]
475 EndpointType::Mqtt(cfg) => {
476 let mut config = cfg.clone();
477 if config.topic.is_none() {
478 config.topic = Some(route_name.to_string());
479 }
480 if config.client_id.is_none() {
481 config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
482 }
483 Ok(Box::new(mqtt::MqttPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
484 }
485 #[cfg(feature = "zeromq")]
486 EndpointType::ZeroMq(cfg) => {
487 Ok(Box::new(zeromq::ZeroMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
488 }
489 #[cfg(any(feature = "http-client", feature = "http-server"))]
490 EndpointType::Http(cfg) => {
491 #[cfg(feature = "reqwest")]
492 {
493 let sink = http::HttpPublisher::new(cfg).await?;
494 Ok(Box::new(sink) as Box<dyn MessagePublisher>)
495 }
496 #[cfg(not(feature = "reqwest"))]
497 {
498 Err(anyhow!("HTTP publisher requires the 'reqwest' feature"))
499 }
500 }
501 #[cfg(feature = "mongodb")]
502 EndpointType::MongoDb(cfg) => {
503 let mut config = cfg.clone();
504 if config.collection.is_none() {
505 config.collection = Some(route_name.to_string());
506 }
507 Ok(Box::new(mongodb::MongoDbPublisher::new(&config).await?)
508 as Box<dyn MessagePublisher>)
509 }
510 EndpointType::File(cfg) => {
511 Ok(Box::new(file::FilePublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
512 }
513 EndpointType::Static(cfg) => Ok(Box::new(static_endpoint::StaticEndpointPublisher::new(
514 cfg,
515 )?) as Box<dyn MessagePublisher>),
516 EndpointType::Memory(cfg) => {
517 Ok(Box::new(memory::MemoryPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
518 }
519 EndpointType::Null => Ok(Box::new(null::NullPublisher) as Box<dyn MessagePublisher>),
520 EndpointType::Fanout(endpoints) => {
521 let mut publishers = Vec::with_capacity(endpoints.len());
522 for endpoint in endpoints {
523 let p = create_publisher_with_depth(route_name, endpoint, depth + 1).await?;
524 publishers.push(p);
525 }
526 Ok(Box::new(fanout::FanoutPublisher::new(publishers)) as Box<dyn MessagePublisher>)
527 }
528 EndpointType::Switch(cfg) => {
529 let mut cases = std::collections::HashMap::new();
530 for (key, endpoint) in &cfg.cases {
531 let p = create_publisher_with_depth(route_name, endpoint, depth + 1).await?;
532 cases.insert(key.clone(), p);
533 }
534 let default = if let Some(endpoint) = &cfg.default {
535 Some(create_publisher_with_depth(route_name, endpoint, depth + 1).await?)
536 } else {
537 None
538 };
539 Ok(Box::new(switch::SwitchPublisher::new(
540 cfg.metadata_key.clone(),
541 cases,
542 default,
543 )) as Box<dyn MessagePublisher>)
544 }
545 EndpointType::Response(_) => {
546 Ok(Box::new(response::ResponsePublisher) as Box<dyn MessagePublisher>)
547 }
548 EndpointType::Custom { name, config } => {
549 let factory = get_endpoint_factory(name)
550 .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
551 factory.create_publisher(route_name, config).await
552 }
553 #[allow(unreachable_patterns)]
554 _ => Err(anyhow!(
555 "[route:{}] Unsupported publisher endpoint type",
556 route_name
557 )),
558 }?;
559 Ok(publisher)
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::models::{Endpoint, EndpointType};
566 use crate::CanonicalMessage;
567
568 #[tokio::test]
569 async fn test_fanout_publisher_integration() {
570 let ep1 = Endpoint::new_memory("fanout_1", 10);
571 let ep2 = Endpoint::new_memory("fanout_2", 10);
572
573 let chan1 = ep1.channel().unwrap();
574 let chan2 = ep2.channel().unwrap();
575 let fanout_ep = Endpoint::new(EndpointType::Fanout(vec![ep1, ep2]));
576
577 let publisher = create_publisher_from_route("test_fanout", &fanout_ep)
578 .await
579 .expect("Failed to create fanout publisher");
580
581 let msg = CanonicalMessage::new(b"fanout_payload".to_vec(), None);
582 publisher.send(msg).await.expect("Failed to send message");
583
584 assert_eq!(chan1.len(), 1);
585 assert_eq!(chan2.len(), 1);
586
587 let msg1 = chan1.drain_messages().pop().unwrap();
588 let msg2 = chan2.drain_messages().pop().unwrap();
589
590 assert_eq!(msg1.payload, "fanout_payload".as_bytes());
591 assert_eq!(msg2.payload, "fanout_payload".as_bytes());
592 }
593
594 use crate::models::MemoryConfig;
595 #[tokio::test]
596 async fn test_factory_creates_memory_subscriber() {
597 let endpoint = Endpoint {
598 endpoint_type: EndpointType::Memory(
599 MemoryConfig::new("mem".to_string(), None).with_subscribe(true),
600 ),
601 middlewares: vec![],
602 handler: None,
603 };
604
605 let consumer = create_consumer_from_route("test", &endpoint).await.unwrap();
606 let is_subscriber = consumer
608 .as_any()
609 .is::<crate::endpoints::memory::MemoryConsumer>();
610 assert!(is_subscriber, "Factory should create MemoryConsumer");
611 }
612}