allora_core/patterns/content_router.rs
1//! Content-Based Router pattern: route an `Exchange` to one of several processors based on a header value.
2//!
3//! This implements the classic Enterprise Integration Pattern (EIP) "Content-Based Router".
4//! It examines a specific message header and selects a processor whose configured value matches.
5//!
6//! # Why It Exists
7//! In message-driven integrations you often need to send different message types (or business events)
8//! through distinct processing flows. Instead of embedding conditional logic inside processors or
9//! scattering `if/else` routes, a dedicated router centralizes this decision, improving readability
10//! and testability.
11//!
12//! # Why In `patterns` Folder
13//! The `patterns` module groups canonical EIP building blocks (filter, splitter, aggregator, etc.).
14//! `ContentBasedRouter` is one of those core patterns, so it lives alongside them for discoverability
15//! and conceptual cohesion rather than in a generic utilities namespace.
16//!
17//! # Behavior
18//! * Looks up a header (configured via `new(header_name)`).
19//! * If the header matches a registered route value (added via `when(value, processor)`), the associated
20//! processor is invoked.
21//! * If no route matches, returns a `Routing` error (`Error::Routing`).
22//! * Exactly one processor is invoked (first matching key lookup).
23//!
24//! # Async vs Sync
25//! The router itself is lightweight; it defers to the selected processor. Under the `async` feature
26//! its `process` method is async and awaits the downstream processor; without it, synchronous dispatch occurs.
27//!
28//! # Example (basic usage)
29//! ```rust
30//! use allora_core::{patterns::content_router::ContentBasedRouter, processor::ClosureProcessor, route::Route, Exchange, Message};
31//! let hi = ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("hi route")); Ok(()) });
32//! let bye = ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("bye route")); Ok(()) });
33//! let router = ContentBasedRouter::new("kind").when("hi", Box::new(hi)).when("bye", Box::new(bye));
34//! let route = Route::new().add(router).build();
35//! let mut exchange = Exchange::new(Message::from_text("payload"));
36//! exchange.in_msg.set_header("kind", "hi");
37//! let rt = tokio::runtime::Runtime::new().unwrap();
38//! rt.block_on(async { route.run(&mut exchange).await.unwrap(); });
39//! assert_eq!(exchange.out_msg.unwrap().body_text(), Some("hi route"));
40//! ```
41//!
42//! # Error Handling
43//! No match: `Error::Routing("no matching route")`.
44//! Downstream processor errors propagate unchanged.
45//!
46//! # Testing Strategies
47//! * Provide several `when` routes, assert correct one chosen.
48//! * Assert error when header missing.
49//! * Use processors that mutate headers to verify only one is executed.
50//!
51//! # Future Extensions
52//! * Predicate-based routing (closures) instead of plain equality.
53//! * Default / fallback processor.
54//! * Support for wildcards / pattern matching.
55//! * Metrics counters (per route hit counts).
56use crate::{
57 error::{Error, Result},
58 processor::Processor,
59 Exchange,
60};
61use std::collections::HashMap;
62use std::fmt::Debug;
63
64#[derive(Debug)]
65pub struct ContentBasedRouter {
66 routes: HashMap<String, Box<dyn Processor>>,
67 header: String,
68}
69
70impl ContentBasedRouter {
71 /// Create a new [`ContentBasedRouter`] instance, specifying the header name to inspect for routing.
72 /// Example: `ContentBasedRouter::new("x-my-header")`.
73 ///
74 /// # Panics
75 /// Panics if the header name is empty.
76 pub fn new<H: Into<String>>(header: H) -> Self {
77 let h = header.into();
78 assert!(!h.is_empty(), "header name must not be empty");
79 Self {
80 routes: HashMap::new(),
81 header: h,
82 }
83 }
84
85 /// Add a route to the router, specifying a header value and the processor to handle messages with
86 /// that value.
87 /// Example: `.when("value1", Box::new(my_processor))`.
88 ///
89 /// # Panics
90 /// Panics if the value is empty.
91 pub fn when<V: Into<String>>(mut self, value: V, proc: Box<dyn Processor>) -> Self {
92 let v = value.into();
93 assert!(!v.is_empty(), "route value must not be empty");
94 self.routes.insert(v, proc);
95 self
96 }
97
98 /// Internal method to select the appropriate processor based on the exchange's message header.
99 /// Returns `None` if no matching processor is found.
100 fn select(&self, exchange: &Exchange) -> Option<&Box<dyn Processor>> {
101 exchange
102 .in_msg
103 .header(&self.header)
104 .and_then(|val| self.routes.get(val))
105 }
106}
107
108#[async_trait::async_trait]
109impl Processor for ContentBasedRouter {
110 /// Process the exchange by delegating to the selected processor based on the content-based routing.
111 /// Returns an error if no matching route is found.
112 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
113 if let Some(p) = self.select(exchange) {
114 p.process(exchange).await
115 } else {
116 Err(Error::Routing("no matching route".into()))
117 }
118 }
119}