Skip to main content

camel_processor/
dynamic_set_property.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9#[derive(Clone)]
10pub struct DynamicSetProperty<P, F> {
11    inner: P,
12    key: String,
13    expr: F,
14}
15
16impl<P, F> DynamicSetProperty<P, F>
17where
18    F: Fn(&Exchange) -> Value,
19{
20    pub fn new(inner: P, key: impl Into<String>, expr: F) -> Self {
21        Self {
22            inner,
23            key: key.into(),
24            expr,
25        }
26    }
27}
28
29#[derive(Clone)]
30pub struct DynamicSetPropertyLayer<F> {
31    key: String,
32    expr: F,
33}
34
35impl<F> DynamicSetPropertyLayer<F> {
36    pub fn new(key: impl Into<String>, expr: F) -> Self {
37        Self {
38            key: key.into(),
39            expr,
40        }
41    }
42}
43
44impl<S, F> tower::Layer<S> for DynamicSetPropertyLayer<F>
45where
46    F: Clone,
47{
48    type Service = DynamicSetProperty<S, F>;
49
50    fn layer(&self, inner: S) -> Self::Service {
51        DynamicSetProperty {
52            inner,
53            key: self.key.clone(),
54            expr: self.expr.clone(),
55        }
56    }
57}
58
59impl<P, F> Service<Exchange> for DynamicSetProperty<P, F>
60where
61    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
62    P::Future: Send,
63    F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
64{
65    type Response = Exchange;
66    type Error = CamelError;
67    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
68
69    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70        self.inner.poll_ready(cx)
71    }
72
73    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
74        let value = (self.expr)(&exchange);
75        exchange.set_property(self.key.clone(), value);
76        let fut = self.inner.call(exchange);
77        Box::pin(fut)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use camel_api::{Exchange, IdentityProcessor, Message, Value};
84    use tower::ServiceExt;
85
86    use super::*;
87
88    #[tokio::test]
89    async fn test_dynamic_set_property_from_body() {
90        let exchange = Exchange::new(Message::new("world"));
91
92        let svc = DynamicSetProperty::new(IdentityProcessor, "greeting", |ex: &Exchange| {
93            Value::String(format!("hello {}", ex.input.body.as_text().unwrap_or("")))
94        });
95
96        let result = svc.oneshot(exchange).await.unwrap();
97        assert_eq!(
98            result.property("greeting"),
99            Some(&Value::String("hello world".into()))
100        );
101    }
102
103    #[tokio::test]
104    async fn test_dynamic_set_property_overwrites_existing() {
105        let mut exchange = Exchange::new(Message::new("new"));
106        exchange.set_property("key", Value::String("old".into()));
107
108        let svc = DynamicSetProperty::new(IdentityProcessor, "key", |ex: &Exchange| {
109            Value::String(ex.input.body.as_text().unwrap_or("").into())
110        });
111
112        let result = svc.oneshot(exchange).await.unwrap();
113        assert_eq!(result.property("key"), Some(&Value::String("new".into())));
114    }
115
116    #[tokio::test]
117    async fn test_dynamic_set_property_preserves_body() {
118        let exchange = Exchange::new(Message::new("body content"));
119
120        let svc = DynamicSetProperty::new(IdentityProcessor, "len", |ex: &Exchange| {
121            let len = ex.input.body.as_text().map(|t| t.len() as i64).unwrap_or(0);
122            Value::Number(len.into())
123        });
124
125        let result = svc.oneshot(exchange).await.unwrap();
126        assert_eq!(result.input.body.as_text(), Some("body content"));
127        assert_eq!(result.property("len"), Some(&Value::Number(12.into())));
128    }
129
130    #[tokio::test]
131    async fn test_dynamic_set_property_layer_composes() {
132        use tower::ServiceBuilder;
133
134        let svc = ServiceBuilder::new()
135            .layer(DynamicSetPropertyLayer::new(
136                "computed",
137                |_ex: &Exchange| Value::Bool(true),
138            ))
139            .service(IdentityProcessor);
140
141        let exchange = Exchange::new(Message::default());
142        let result = svc.oneshot(exchange).await.unwrap();
143        assert_eq!(result.property("computed"), Some(&Value::Bool(true)));
144    }
145}