allora_core/patterns/
content_router.rs

1//! Content-Based Router pattern: route an `Exchange` to one of several processors based on a header value.
2//!
3//! This implements the classic Enterprise Integration Pattern (EIP) "Content-Based Router".
4//! It examines a specific message header and selects a processor whose configured value matches.
5//!
6//! # Why It Exists
7//! In message-driven integrations you often need to send different message types (or business events)
8//! through distinct processing flows. Instead of embedding conditional logic inside processors or
9//! scattering `if/else` routes, a dedicated router centralizes this decision, improving readability
10//! and testability.
11//!
12//! # Why In `patterns` Folder
13//! The `patterns` module groups canonical EIP building blocks (filter, splitter, aggregator, etc.).
14//! `ContentBasedRouter` is one of those core patterns, so it lives alongside them for discoverability
15//! and conceptual cohesion rather than in a generic utilities namespace.
16//!
17//! # Behavior
18//! * Looks up a header (configured via `new(header_name)`).
19//! * If the header matches a registered route value (added via `when(value, processor)`), the associated
20//!   processor is invoked.
21//! * If no route matches, returns a `Routing` error (`Error::Routing`).
22//! * Exactly one processor is invoked (first matching key lookup).
23//!
24//! # Async vs Sync
25//! The router itself is lightweight; it defers to the selected processor. Under the `async` feature
26//! its `process` method is async and awaits the downstream processor; without it, synchronous dispatch occurs.
27//!
28//! # Example (basic usage)
29//! ```rust
30//! use allora_core::{patterns::content_router::ContentBasedRouter, processor::ClosureProcessor, route::Route, Exchange, Message};
31//! let hi = ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("hi route")); Ok(()) });
32//! let bye = ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("bye route")); Ok(()) });
33//! let router = ContentBasedRouter::new("kind").when("hi", Box::new(hi)).when("bye", Box::new(bye));
34//! let route = Route::new().add(router).build();
35//! let mut exchange = Exchange::new(Message::from_text("payload"));
36//! exchange.in_msg.set_header("kind", "hi");
37//! let rt = tokio::runtime::Runtime::new().unwrap();
38//! rt.block_on(async { route.run(&mut exchange).await.unwrap(); });
39//! assert_eq!(exchange.out_msg.unwrap().body_text(), Some("hi route"));
40//! ```
41//!
42//! # Error Handling
43//! No match: `Error::Routing("no matching route")`.
44//! Downstream processor errors propagate unchanged.
45//!
46//! # Testing Strategies
47//! * Provide several `when` routes, assert correct one chosen.
48//! * Assert error when header missing.
49//! * Use processors that mutate headers to verify only one is executed.
50//!
51//! # Future Extensions
52//! * Predicate-based routing (closures) instead of plain equality.
53//! * Default / fallback processor.
54//! * Support for wildcards / pattern matching.
55//! * Metrics counters (per route hit counts).
56use crate::{
57    error::{Error, Result},
58    processor::Processor,
59    Exchange,
60};
61use std::collections::HashMap;
62use std::fmt::Debug;
63
64#[derive(Debug)]
65pub struct ContentBasedRouter {
66    routes: HashMap<String, Box<dyn Processor>>,
67    header: String,
68}
69
70impl ContentBasedRouter {
71    /// Create a new [`ContentBasedRouter`] instance, specifying the header name to inspect for routing.
72    /// Example: `ContentBasedRouter::new("x-my-header")`.
73    ///
74    /// # Panics
75    /// Panics if the header name is empty.
76    pub fn new<H: Into<String>>(header: H) -> Self {
77        let h = header.into();
78        assert!(!h.is_empty(), "header name must not be empty");
79        Self {
80            routes: HashMap::new(),
81            header: h,
82        }
83    }
84
85    /// Add a route to the router, specifying a header value and the processor to handle messages with
86    /// that value.
87    /// Example: `.when("value1", Box::new(my_processor))`.
88    ///
89    /// # Panics
90    /// Panics if the value is empty.
91    pub fn when<V: Into<String>>(mut self, value: V, proc: Box<dyn Processor>) -> Self {
92        let v = value.into();
93        assert!(!v.is_empty(), "route value must not be empty");
94        self.routes.insert(v, proc);
95        self
96    }
97
98    /// Internal method to select the appropriate processor based on the exchange's message header.
99    /// Returns `None` if no matching processor is found.
100    fn select(&self, exchange: &Exchange) -> Option<&Box<dyn Processor>> {
101        exchange
102            .in_msg
103            .header(&self.header)
104            .and_then(|val| self.routes.get(val))
105    }
106}
107
108#[async_trait::async_trait]
109impl Processor for ContentBasedRouter {
110    /// Process the exchange by delegating to the selected processor based on the content-based routing.
111    /// Returns an error if no matching route is found.
112    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
113        if let Some(p) = self.select(exchange) {
114            p.process(exchange).await
115        } else {
116            Err(Error::Routing("no matching route".into()))
117        }
118    }
119}