Skip to main content

coreon_core/
processor.rs

1//! Processor — async transformation of an Exchange.
2
3use crate::{error::Result, exchange::Exchange};
4use async_trait::async_trait;
5use std::sync::Arc;
6
7/// The central abstraction. Processors are composed into pipelines; they
8/// transform the Exchange in place (or route it elsewhere) and return `Ok(())`
9/// to let the pipeline continue, or an error to trigger the error handler.
10#[async_trait]
11pub trait Processor: Send + Sync {
12    async fn process(&self, exchange: &mut Exchange) -> Result<()>;
13}
14
15/// Adapter that lifts a closure into a `Processor`.
16///
17/// Closures must be `Send + Sync + 'static` and return `Result<()>`. Async
18/// closures aren't stable yet, so this takes a sync closure; async work
19/// belongs in dedicated `Processor` impls.
20pub struct FnProcessor<F>(F);
21
22impl<F> FnProcessor<F>
23where
24    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
25{
26    pub fn new(f: F) -> Arc<Self> {
27        Arc::new(Self(f))
28    }
29}
30
31#[async_trait]
32impl<F> Processor for FnProcessor<F>
33where
34    F: Fn(&mut Exchange) -> Result<()> + Send + Sync + 'static,
35{
36    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
37        (self.0)(exchange)
38    }
39}
40
41/// Predicate — a boolean test over an Exchange. Separate from Processor
42/// because EIPs like Filter/Choice compose predicates, not processors.
43pub trait Predicate: Send + Sync {
44    fn matches(&self, exchange: &Exchange) -> bool;
45}
46
47impl<F> Predicate for F
48where
49    F: Fn(&Exchange) -> bool + Send + Sync,
50{
51    fn matches(&self, exchange: &Exchange) -> bool {
52        self(exchange)
53    }
54}