Skip to main content

camel_processor/
enrichment_strategy.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use camel_api::{CamelError, Exchange};
5
6/// Merges the original Exchange with the enriched/polled Exchange in the EIP-7
7/// `enrich` and `pollEnrich` verbs.
8///
9/// Distinct from the EIP-22 `AggregationStrategy` family (which combines many
10/// Exchanges into one); `EnrichmentStrategy` is always 2→1 (original + enriched).
11#[async_trait]
12pub trait EnrichmentStrategy: Send + Sync {
13    fn aggregate(&self, original: Exchange, enriched: Exchange) -> Exchange;
14
15    /// Called when pollEnrich receives no message (`None` from
16    /// `PollingConsumer::receive`). The default implementation passes the
17    /// original exchange through unchanged, matching Camel 4.x passthrough
18    /// semantics. Override to throw an error or perform other actions.
19    async fn on_no_poll(&self, original: Exchange) -> Result<Exchange, CamelError> {
20        Ok(original)
21    }
22}
23
24/// Default strategy: discard the original body, replace with the enriched body.
25/// Headers and properties from the original are preserved; enriched headers
26/// are discarded. Implement a custom `EnrichmentStrategy` if you need to merge
27/// headers from the enriched exchange.
28pub struct UseEnrichedBody;
29
30#[async_trait]
31impl EnrichmentStrategy for UseEnrichedBody {
32    fn aggregate(&self, mut original: Exchange, enriched: Exchange) -> Exchange {
33        // Replace the body from the enriched exchange, but keep original headers/properties.
34        original.input.body = enriched.input.body;
35        original
36    }
37    // on_no_poll uses the default passthrough
38}
39
40/// Strategy that throws a [`CamelError::ProcessorError`] when pollEnrich
41/// receives no message. Wraps a base [`EnrichmentStrategy`] for the
42/// `aggregate` behaviour.
43pub struct ThrowOnNoPoll {
44    inner: Arc<dyn EnrichmentStrategy>,
45}
46
47impl ThrowOnNoPoll {
48    pub fn new(inner: Arc<dyn EnrichmentStrategy>) -> Self {
49        Self { inner }
50    }
51}
52
53#[async_trait]
54impl EnrichmentStrategy for ThrowOnNoPoll {
55    fn aggregate(&self, original: Exchange, enriched: Exchange) -> Exchange {
56        self.inner.aggregate(original, enriched)
57    }
58
59    async fn on_no_poll(&self, _original: Exchange) -> Result<Exchange, CamelError> {
60        Err(CamelError::ProcessorError(
61            "poll-enrich: no message available".into(),
62        ))
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use camel_api::{Body, Message};
70
71    fn exchange(body: &str) -> Exchange {
72        Exchange::new(Message::new(body))
73    }
74
75    #[test]
76    fn use_enriched_body_replaces_body() {
77        let strategy = UseEnrichedBody;
78        let result = strategy.aggregate(exchange("original"), exchange("enriched"));
79        match &result.input.body {
80            Body::Text(s) => assert_eq!(s, "enriched"),
81            other => panic!("unexpected body {other:?}"),
82        }
83    }
84}