allora_core/
route.rs

1use crate::{error::Result, processor::Processor, Exchange};
2
3/// A `Route` represents an ordered pipeline of `Processor` implementations applied to
4/// an `Exchange`. Each processor can mutate the `Exchange` (e.g. enrich headers,
5/// transform the payload, set `out_msg`, assign correlation identifiers, etc.).
6///
7/// # Design Goals
8/// * Simple, minimal abstraction over a Vec of boxed processors.
9/// * Works in both synchronous and asynchronous modes (feature `async`).
10/// * Deterministic ordering: processors are executed in the order they were added.
11/// * Uniform error propagation: the first processor returning an error short-circuits the route.
12/// * Extensible: callers can wrap `Route::run` to add cross-cutting concerns (metrics, tracing, retry).
13///
14/// # When to Use
15/// Use a `Route` anytime you need to compose a series of message transformations / routing logic.
16/// It mirrors concepts from EIP frameworks where a route is an assembly of steps.
17///
18/// # Correlation & Message IDs
19/// If your processors rely on correlation IDs (e.g. aggregation), have an early processor call
20/// `exchange.in_msg.ensure_correlation_id()` to guarantee one is present. `Message` already
21/// auto-generates a `message_id` header on creation.
22///
23/// # Examples
24/// Unified example:
25/// ```rust
26/// use allora_core::{processor::ClosureProcessor, route::Route, Exchange, Message};
27/// let route = Route::new().add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("done")); Ok(()) })).build();
28/// let mut exchange = Exchange::new(Message::from_text("hi"));
29/// tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
30/// assert_eq!(exchange.out_msg.unwrap().body_text(), Some("done"));
31/// ```
32///
33/// # Error Handling
34/// If any processor returns an `Err`, processing stops immediately and the error is returned to
35/// the caller. Downstream processors are not executed.
36///
37/// # Future Extensions (Ideas)
38/// * Conditional routing (e.g. only execute processor if a header matches).
39/// * Branch / fork processors returning multiple Exchanges.
40/// * Built-in metrics / tracing instrumentation wrapper.
41/// * Middleware style before / after hooks.
42///
43/// Create routes via the builder pattern: `Route::new().add(...).add(...).build()`.
44#[derive(Default, Debug)]
45pub struct Route {
46    processors: Vec<Box<dyn Processor>>,
47}
48
49impl Route {
50    /// Create an empty route.
51    pub fn new() -> Self {
52        Self {
53            processors: Vec::new(),
54        }
55    }
56    /// Add a processor to the route. Processors execute in insertion order.
57    pub fn add<P: Processor + 'static>(mut self, p: P) -> Self {
58        self.processors.push(Box::new(p));
59        self
60    }
61    /// Finalize the route (currently a no-op, kept for API symmetry / future extension).
62    pub fn build(self) -> Self {
63        self
64    }
65    /// Convenience: create a route whose first processor ensures a correlation id.
66    /// Optionally mirror the correlation id into an additional header name.
67    /// Example:
68    /// ```rust
69    /// use allora_core::{route::Route, processor::ClosureProcessor, Message, Exchange};
70    /// let route = Route::with_correlation(None)
71    ///     .add(ClosureProcessor::new(|exchange| { exchange.out_msg = Some(Message::from_text("pong")); Ok(()) }))
72    ///     .build();
73    /// let mut exchange = Exchange::new(Message::from_text("ping"));
74    /// tokio::runtime::Runtime::new().unwrap().block_on(async { route.run(&mut exchange).await.unwrap(); });
75    /// assert!(exchange.in_msg.header("correlation_id").is_some());
76    /// ```
77    pub fn with_correlation(mirror_header: Option<&str>) -> Self {
78        use crate::patterns::correlation_initializer::CorrelationInitializer;
79        let init = match mirror_header {
80            Some(h) => CorrelationInitializer::with_mirror(h),
81            None => CorrelationInitializer::default(),
82        };
83        Self {
84            processors: vec![Box::new(init)],
85        }
86    }
87    /// Run the route asynchronously over a mutable `Exchange`.
88    pub async fn run(&self, exchange: &mut Exchange) -> Result<()> {
89        for p in &self.processors {
90            p.process(exchange).await?;
91        }
92        Ok(())
93    }
94}