allora_core/
processor.rs

1use crate::{error::Result, Exchange};
2use std::fmt::{Debug, Formatter, Result as FmtResult};
3
4/// A Processor transforms an Exchange (async-only model).
5///
6/// # Semantics
7/// * Single async `process` method; must be non-blocking (offload heavy CPU or blocking IO).
8/// * Return `Ok(())` to continue route execution; return `Err(Error)` to short-circuit.
9/// * Mutate `exchange.in_msg`, set `exchange.out_msg`, or add headers as needed; avoid global state.
10///
11/// # Error Propagation
12/// Use specific `Error` variants for clarity (`Error::Processor`, `Error::Routing`, etc.).
13///
14/// # Example
15/// ```rust
16/// use allora_core::{error::Result, processor::Processor, Exchange, Message};
17/// #[derive(Debug)] struct Upper;
18/// #[async_trait::async_trait]
19/// impl Processor for Upper {
20///     async fn process(&self, exchange: &mut Exchange) -> Result<()> {
21///         if let Some(body) = exchange.in_msg.body_text() {
22///             exchange.out_msg = Some(Message::from_text(body.to_uppercase()));
23///         }
24///         Ok(())
25///     }
26/// }
27/// let p = Upper;
28/// let mut ex = Exchange::new(Message::from_text("hi"));
29/// tokio::runtime::Runtime::new().unwrap().block_on(async { p.process(&mut ex).await.unwrap(); });
30/// assert_eq!(ex.out_msg.unwrap().body_text(), Some("HI"));
31/// ```
32#[async_trait::async_trait]
33pub trait Processor: Send + Sync + Debug {
34    async fn process(&self, exchange: &mut Exchange) -> Result<()>;
35}
36
37#[derive()]
38pub struct ClosureProcessor<F>
39where
40    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
41{
42    func: F,
43}
44
45impl<F> ClosureProcessor<F>
46where
47    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
48{
49    pub fn new(func: F) -> Self {
50        Self { func }
51    }
52    /// Convenience helper identical to `new` for readability in fluent route construction.
53    pub fn closure(func: F) -> Self {
54        Self { func }
55    }
56}
57
58#[async_trait::async_trait]
59impl<F> Processor for ClosureProcessor<F>
60where
61    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
62{
63    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
64        (self.func)(exchange)
65    }
66}
67
68impl<F> Debug for ClosureProcessor<F>
69where
70    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
71{
72    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
73        f.write_str("ClosureProcessor{func=*closure*}")
74    }
75}
76
77/// Boxed dynamic processor type.
78/// Useful for heterogeneous collections (used internally by `Route`).
79pub type BoxedProcessor = Box<dyn Processor>;