Skip to main content

camel_processor/
set_property.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9#[derive(Clone)]
10pub struct SetProperty<P> {
11    inner: P,
12    key: String,
13    value: Value,
14}
15
16impl<P> SetProperty<P> {
17    pub fn new(inner: P, key: impl Into<String>, value: impl Into<Value>) -> Self {
18        Self {
19            inner,
20            key: key.into(),
21            value: value.into(),
22        }
23    }
24}
25
26#[derive(Clone)]
27pub struct SetPropertyLayer {
28    key: String,
29    value: Value,
30}
31
32impl SetPropertyLayer {
33    pub fn new(key: impl Into<String>, value: impl Into<Value>) -> Self {
34        Self {
35            key: key.into(),
36            value: value.into(),
37        }
38    }
39}
40
41impl<S> tower::Layer<S> for SetPropertyLayer {
42    type Service = SetProperty<S>;
43
44    fn layer(&self, inner: S) -> Self::Service {
45        SetProperty::new(inner, self.key.clone(), self.value.clone())
46    }
47}
48
49impl<P> Service<Exchange> for SetProperty<P>
50where
51    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
52    P::Future: Send,
53{
54    type Response = Exchange;
55    type Error = CamelError;
56    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
57
58    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59        self.inner.poll_ready(cx)
60    }
61
62    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
63        exchange.set_property(self.key.clone(), self.value.clone());
64        let fut = self.inner.call(exchange);
65        Box::pin(fut)
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use camel_api::{IdentityProcessor, Message};
73    use tower::ServiceExt;
74
75    #[tokio::test]
76    async fn test_set_property_adds_property() {
77        let exchange = Exchange::new(Message::default());
78
79        let processor =
80            SetProperty::new(IdentityProcessor, "source", Value::String("timer".into()));
81
82        let result = processor.oneshot(exchange).await.unwrap();
83        assert_eq!(
84            result.property("source"),
85            Some(&Value::String("timer".into()))
86        );
87    }
88
89    #[tokio::test]
90    async fn test_set_property_overwrites_existing() {
91        let mut exchange = Exchange::new(Message::default());
92        exchange.set_property("key", Value::String("old".into()));
93
94        let processor = SetProperty::new(IdentityProcessor, "key", Value::String("new".into()));
95
96        let result = processor.oneshot(exchange).await.unwrap();
97        assert_eq!(result.property("key"), Some(&Value::String("new".into())));
98    }
99
100    #[tokio::test]
101    async fn test_set_property_preserves_body() {
102        let exchange = Exchange::new(Message::new("body content"));
103
104        let processor = SetProperty::new(IdentityProcessor, "prop", Value::Bool(true));
105
106        let result = processor.oneshot(exchange).await.unwrap();
107        assert_eq!(result.input.body.as_text(), Some("body content"));
108        assert_eq!(result.property("prop"), Some(&Value::Bool(true)));
109    }
110
111    #[tokio::test]
112    async fn test_set_property_layer_composes() {
113        use tower::ServiceBuilder;
114
115        let svc = ServiceBuilder::new()
116            .layer(super::SetPropertyLayer::new(
117                "env",
118                Value::String("test".into()),
119            ))
120            .service(IdentityProcessor);
121
122        let exchange = Exchange::new(Message::default());
123        let result = svc.oneshot(exchange).await.unwrap();
124        assert_eq!(result.property("env"), Some(&Value::String("test".into())));
125    }
126}