camel_processor/
enrichment_strategy.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use camel_api::{CamelError, Exchange};
5
6#[async_trait]
12pub trait EnrichmentStrategy: Send + Sync {
13 fn aggregate(&self, original: Exchange, enriched: Exchange) -> Exchange;
14
15 async fn on_no_poll(&self, original: Exchange) -> Result<Exchange, CamelError> {
20 Ok(original)
21 }
22}
23
24pub struct UseEnrichedBody;
29
30#[async_trait]
31impl EnrichmentStrategy for UseEnrichedBody {
32 fn aggregate(&self, mut original: Exchange, enriched: Exchange) -> Exchange {
33 original.input.body = enriched.input.body;
35 original
36 }
37 }
39
40pub struct ThrowOnNoPoll {
44 inner: Arc<dyn EnrichmentStrategy>,
45}
46
47impl ThrowOnNoPoll {
48 pub fn new(inner: Arc<dyn EnrichmentStrategy>) -> Self {
49 Self { inner }
50 }
51}
52
53#[async_trait]
54impl EnrichmentStrategy for ThrowOnNoPoll {
55 fn aggregate(&self, original: Exchange, enriched: Exchange) -> Exchange {
56 self.inner.aggregate(original, enriched)
57 }
58
59 async fn on_no_poll(&self, _original: Exchange) -> Result<Exchange, CamelError> {
60 Err(CamelError::ProcessorError(
61 "poll-enrich: no message available".into(),
62 ))
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69 use camel_api::{Body, Message};
70
71 fn exchange(body: &str) -> Exchange {
72 Exchange::new(Message::new(body))
73 }
74
75 #[test]
76 fn use_enriched_body_replaces_body() {
77 let strategy = UseEnrichedBody;
78 let result = strategy.aggregate(exchange("original"), exchange("enriched"));
79 match &result.input.body {
80 Body::Text(s) => assert_eq!(s, "enriched"),
81 other => panic!("unexpected body {other:?}"),
82 }
83 }
84}