pub trait Processor:
Send
+ Sync
+ Debug {
// Required method
fn process<'life0, 'life1, 'async_trait>(
&'life0 self,
exchange: &'life1 mut Exchange,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
}Expand description
A Processor transforms an Exchange (async-only model).
§Semantics
- Single async
processmethod; must be non-blocking (offload heavy CPU or blocking IO). - Return
Ok(())to continue route execution; returnErr(Error)to short-circuit. - Mutate
exchange.in_msg, setexchange.out_msg, or add headers as needed; avoid global state.
§Error Propagation
Use specific Error variants for clarity (Error::Processor, Error::Routing, etc.).
§Example
use allora_core::{error::Result, processor::Processor, Exchange, Message};
#[derive(Debug)] struct Upper;
#[async_trait::async_trait]
impl Processor for Upper {
async fn process(&self, exchange: &mut Exchange) -> Result<()> {
if let Some(body) = exchange.in_msg.body_text() {
exchange.out_msg = Some(Message::from_text(body.to_uppercase()));
}
Ok(())
}
}
let p = Upper;
let mut ex = Exchange::new(Message::from_text("hi"));
tokio::runtime::Runtime::new().unwrap().block_on(async { p.process(&mut ex).await.unwrap(); });
assert_eq!(ex.out_msg.unwrap().body_text(), Some("HI"));