Skip to main content

camel_processor/
content_enricher.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use camel_api::{BoxProcessor, CamelError, Exchange};
8use camel_component_api::endpoint::PollingConsumer;
9use tower::Service;
10use tower::ServiceExt;
11
12use crate::EnrichmentStrategy;
13
14/// EIP-7 enrich: synchronous content enrichment via a resolved producer.
15#[derive(Clone)]
16pub struct EnrichService {
17    producer: BoxProcessor,
18    strategy: Arc<dyn EnrichmentStrategy>,
19}
20
21impl EnrichService {
22    pub fn new(producer: BoxProcessor, strategy: Arc<dyn EnrichmentStrategy>) -> Self {
23        Self { producer, strategy }
24    }
25}
26
27impl Service<Exchange> for EnrichService {
28    type Response = Exchange;
29    type Error = CamelError;
30    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33        // Propagate producer backpressure (matches repo convention: see wire_tap.rs,
34        // security_policy_layer.rs, throttler.rs). EnrichService must not claim
35        // readiness while the wrapped producer still applies backpressure.
36        self.producer.poll_ready(cx)
37    }
38
39    fn call(&mut self, original: Exchange) -> Self::Future {
40        // BoxProcessor is BoxCloneService — clone for the async block.
41        let producer = self.producer.clone();
42        let strategy = Arc::clone(&self.strategy);
43        Box::pin(async move {
44            let enriched = producer.oneshot(original.clone()).await?;
45            Ok(strategy.aggregate(original, enriched))
46        })
47    }
48}
49
50/// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
51///
52/// Wraps the `Box<dyn PollingConsumer>` in `Arc<tokio::sync::Mutex<...>>`
53/// because the trait requires `&mut self` for `receive` and `Box<dyn ...>`
54/// is not Clone. The mutex is held only across the `receive().await` call,
55/// not across the strategy aggregation.
56#[derive(Clone)]
57pub struct PollEnrichService {
58    poller: Arc<tokio::sync::Mutex<Box<dyn PollingConsumer>>>,
59    timeout: Duration,
60    strategy: Arc<dyn EnrichmentStrategy>,
61}
62
63impl PollEnrichService {
64    pub fn new(
65        poller: Box<dyn PollingConsumer>,
66        timeout: Duration,
67        strategy: Arc<dyn EnrichmentStrategy>,
68    ) -> Self {
69        Self {
70            poller: Arc::new(tokio::sync::Mutex::new(poller)),
71            timeout,
72            strategy,
73        }
74    }
75}
76
77impl Service<Exchange> for PollEnrichService {
78    type Response = Exchange;
79    type Error = CamelError;
80    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
81
82    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83        Poll::Ready(Ok(()))
84    }
85
86    fn call(&mut self, original: Exchange) -> Self::Future {
87        let poller = Arc::clone(&self.poller);
88        let strategy = Arc::clone(&self.strategy);
89        let timeout = self.timeout;
90        Box::pin(async move {
91            let mut guard = poller.lock().await;
92            let enriched_opt = guard.receive(timeout).await?;
93            drop(guard); // release before strategy to minimize critical section
94            match enriched_opt {
95                Some(enriched) => Ok(strategy.aggregate(original, enriched)),
96                None => Ok(original), // no message in window: pass through
97            }
98        })
99    }
100}