pub struct Route { /* private fields */ }Expand description
A Route represents an ordered pipeline of Processor implementations applied to
an Exchange. Each processor can mutate the Exchange (e.g. enrich headers,
transform the payload, set out_msg, assign correlation identifiers, etc.).
§Design Goals
- Simple, minimal abstraction over a Vec of boxed processors.
- Works in both synchronous and asynchronous modes (feature
async). - Deterministic ordering: processors are executed in the order they were added.
- Uniform error propagation: the first processor returning an error short-circuits the route.
- Extensible: callers can wrap
Route::runto add cross-cutting concerns (metrics, tracing, retry).
§When to Use
Use a Route anytime you need to compose a series of message transformations / routing logic.
It mirrors concepts from EIP frameworks where a route is an assembly of steps.
§Correlation & Message IDs
If your processors rely on correlation IDs (e.g. aggregation), have an early processor call
exchange.in_msg.ensure_correlation_id() to guarantee one is present. Message already
auto-generates a message_id header on creation.
§Examples
Unified example:
use allora_core::{processor::ClosureProcessor, route::Route, Exchange, Message};
let route = Route::new().add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("done")); Ok(()) })).build();
let mut exchange = Exchange::new(Message::from_text("hi"));
tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
assert_eq!(exchange.out_msg.unwrap().body_text(), Some("done"));§Error Handling
If any processor returns an Err, processing stops immediately and the error is returned to
the caller. Downstream processors are not executed.
§Future Extensions (Ideas)
- Conditional routing (e.g. only execute processor if a header matches).
- Branch / fork processors returning multiple Exchanges.
- Built-in metrics / tracing instrumentation wrapper.
- Middleware style before / after hooks.
Create routes via the builder pattern: Route::new().add(...).add(...).build().
Implementations§
Source§impl Route
impl Route
Sourcepub fn add<P>(self, p: P) -> Routewhere
P: Processor + 'static,
pub fn add<P>(self, p: P) -> Routewhere
P: Processor + 'static,
Add a processor to the route. Processors execute in insertion order.
Sourcepub fn build(self) -> Route
pub fn build(self) -> Route
Finalize the route (currently a no-op, kept for API symmetry / future extension).
Sourcepub fn with_correlation(mirror_header: Option<&str>) -> Route
pub fn with_correlation(mirror_header: Option<&str>) -> Route
Convenience: create a route whose first processor ensures a correlation id. Optionally mirror the correlation id into an additional header name. Example:
use allora_core::{route::Route, processor::ClosureProcessor, Message, Exchange};
let route = Route::with_correlation(None)
.add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("pong")); Ok(()) }))
.build();
let mut exchange = Exchange::new(Message::from_text("ping"));
tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
assert!(exchange.in_msg.header("correlation_id").is_some());