allora_core/route.rs
1use crate::{error::Result, processor::Processor, Exchange};
2
3/// A `Route` represents an ordered pipeline of `Processor` implementations applied to
4/// an `Exchange`. Each processor can mutate the `Exchange` (e.g. enrich headers,
5/// transform the payload, set `out_msg`, assign correlation identifiers, etc.).
6///
7/// # Design Goals
8/// * Simple, minimal abstraction over a Vec of boxed processors.
9/// * Works in both synchronous and asynchronous modes (feature `async`).
10/// * Deterministic ordering: processors are executed in the order they were added.
11/// * Uniform error propagation: the first processor returning an error short-circuits the route.
12/// * Extensible: callers can wrap `Route::run` to add cross-cutting concerns (metrics, tracing, retry).
13///
14/// # When to Use
15/// Use a `Route` anytime you need to compose a series of message transformations / routing logic.
16/// It mirrors concepts from EIP frameworks where a route is an assembly of steps.
17///
18/// # Correlation & Message IDs
19/// If your processors rely on correlation IDs (e.g. aggregation), have an early processor call
20/// `exchange.in_msg.ensure_correlation_id()` to guarantee one is present. `Message` already
21/// auto-generates a `message_id` header on creation.
22///
23/// # Examples
24/// Unified example:
25/// ```rust
26/// use allora_core::{processor::ClosureProcessor, route::Route, Exchange, Message};
27/// let route = Route::new().add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("done")); Ok(()) })).build();
28/// let mut exchange = Exchange::new(Message::from_text("hi"));
29/// tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
30/// assert_eq!(exchange.out_msg.unwrap().body_text(), Some("done"));
31/// ```
32///
33/// # Error Handling
34/// If any processor returns an `Err`, processing stops immediately and the error is returned to
35/// the caller. Downstream processors are not executed.
36///
37/// # Future Extensions (Ideas)
38/// * Conditional routing (e.g. only execute processor if a header matches).
39/// * Branch / fork processors returning multiple Exchanges.
40/// * Built-in metrics / tracing instrumentation wrapper.
41/// * Middleware style before / after hooks.
42///
43/// Create routes via the builder pattern: `Route::new().add(...).add(...).build()`.
44#[derive(Default, Debug)]
45pub struct Route {
46 processors: Vec<Box<dyn Processor>>,
47}
48
49impl Route {
50 /// Create an empty route.
51 pub fn new() -> Self {
52 Self {
53 processors: Vec::new(),
54 }
55 }
56 /// Add a processor to the route. Processors execute in insertion order.
57 pub fn add<P: Processor + 'static>(mut self, p: P) -> Self {
58 self.processors.push(Box::new(p));
59 self
60 }
61 /// Finalize the route (currently a no-op, kept for API symmetry / future extension).
62 pub fn build(self) -> Self {
63 self
64 }
65 /// Convenience: create a route whose first processor ensures a correlation id.
66 /// Optionally mirror the correlation id into an additional header name.
67 /// Example:
68 /// ```rust
69 /// use allora_core::{route::Route, processor::ClosureProcessor, Message, Exchange};
70 /// let route = Route::with_correlation(None)
71 /// .add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("pong")); Ok(()) }))
72 /// .build();
73 /// let mut exchange = Exchange::new(Message::from_text("ping"));
74 /// tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
75 /// assert!(exchange.in_msg.header("correlation_id").is_some());
76 /// ```
77 pub fn with_correlation(mirror_header: Option<&str>) -> Self {
78 use crate::patterns::correlation_initializer::CorrelationInitializer;
79 let init = match mirror_header {
80 Some(h) => CorrelationInitializer::with_mirror(h),
81 None => CorrelationInitializer::default(),
82 };
83 Self {
84 processors: vec![Box::new(init)],
85 }
86 }
87 /// Run the route asynchronously over a mutable `Exchange`.
88 pub async fn run(&self, exchange: &mut Exchange) -> Result<()> {
89 for p in &self.processors {
90 p.process(exchange).await?;
91 }
92 Ok(())
93 }
94}