camel_processor/
content_enricher.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use camel_api::{BoxProcessor, CamelError, Exchange};
8use camel_component_api::endpoint::PollingConsumer;
9use tower::Service;
10use tower::ServiceExt;
11
12use crate::EnrichmentStrategy;
13
14#[derive(Clone)]
16pub struct EnrichService {
17 producer: BoxProcessor,
18 strategy: Arc<dyn EnrichmentStrategy>,
19}
20
21impl EnrichService {
22 pub fn new(producer: BoxProcessor, strategy: Arc<dyn EnrichmentStrategy>) -> Self {
23 Self { producer, strategy }
24 }
25}
26
27impl Service<Exchange> for EnrichService {
28 type Response = Exchange;
29 type Error = CamelError;
30 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
31
32 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33 self.producer.poll_ready(cx)
37 }
38
39 fn call(&mut self, original: Exchange) -> Self::Future {
40 let producer = self.producer.clone();
42 let strategy = Arc::clone(&self.strategy);
43 Box::pin(async move {
44 let enriched = producer.oneshot(original.clone()).await?;
45 Ok(strategy.aggregate(original, enriched))
46 })
47 }
48}
49
50#[derive(Clone)]
57pub struct PollEnrichService {
58 poller: Arc<tokio::sync::Mutex<Box<dyn PollingConsumer>>>,
59 timeout: Duration,
60 strategy: Arc<dyn EnrichmentStrategy>,
61}
62
63impl PollEnrichService {
64 pub fn new(
65 poller: Box<dyn PollingConsumer>,
66 timeout: Duration,
67 strategy: Arc<dyn EnrichmentStrategy>,
68 ) -> Self {
69 Self {
70 poller: Arc::new(tokio::sync::Mutex::new(poller)),
71 timeout,
72 strategy,
73 }
74 }
75}
76
77impl Service<Exchange> for PollEnrichService {
78 type Response = Exchange;
79 type Error = CamelError;
80 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
81
82 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83 Poll::Ready(Ok(()))
84 }
85
86 fn call(&mut self, original: Exchange) -> Self::Future {
87 let poller = Arc::clone(&self.poller);
88 let strategy = Arc::clone(&self.strategy);
89 let timeout = self.timeout;
90 Box::pin(async move {
91 let mut guard = poller.lock().await;
92 let enriched_opt = guard.receive(timeout).await?;
93 drop(guard); match enriched_opt {
95 Some(enriched) => Ok(strategy.aggregate(original, enriched)),
96 None => Ok(original), }
98 })
99 }
100}