Skip to main content

camel_api/
exchange.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13/// The exchange pattern (fire-and-forget or request-reply).
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum ExchangePattern {
16    /// Fire-and-forget: message sent, no reply expected.
17    #[default]
18    InOnly,
19    /// Request-reply: message sent, reply expected.
20    InOut,
21}
22
23/// An Exchange represents a message being routed through the framework.
24///
25/// It contains the input message, an optional output message,
26/// properties for passing data between processors, and error state.
27#[derive(Debug)]
28pub struct Exchange {
29    /// The input (incoming) message.
30    pub input: Message,
31    /// The output (response) message, populated for InOut patterns.
32    pub output: Option<Message>,
33    /// Exchange-scoped properties for passing data between processors.
34    pub properties: HashMap<String, Value>,
35    /// Non-serializable extension values (e.g., channel senders).
36    /// Stored as `Arc<dyn Any + Send + Sync>` so cloning is cheap (ref-count bump).
37    pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
38    /// Error state, if processing failed.
39    pub error: Option<CamelError>,
40    /// The exchange pattern.
41    pub pattern: ExchangePattern,
42    /// Unique correlation ID for distributed tracing.
43    pub correlation_id: String,
44    /// OpenTelemetry context for distributed tracing propagation.
45    /// Carries the active span context between processing steps.
46    /// Defaults to an empty context (noop span) if OTel is not active.
47    pub otel_context: Context,
48}
49
50impl Exchange {
51    /// Create a new exchange with the given input message.
52    pub fn new(input: Message) -> Self {
53        Self {
54            input,
55            output: None,
56            properties: HashMap::new(),
57            extensions: HashMap::new(),
58            error: None,
59            pattern: ExchangePattern::default(),
60            correlation_id: Uuid::new_v4().to_string(),
61            otel_context: Context::new(),
62        }
63    }
64
65    /// Create a new exchange with the InOut pattern.
66    pub fn new_in_out(input: Message) -> Self {
67        Self {
68            input,
69            output: None,
70            properties: HashMap::new(),
71            extensions: HashMap::new(),
72            error: None,
73            pattern: ExchangePattern::InOut,
74            correlation_id: Uuid::new_v4().to_string(),
75            otel_context: Context::new(),
76        }
77    }
78
79    /// Get the correlation ID for this exchange.
80    pub fn correlation_id(&self) -> &str {
81        &self.correlation_id
82    }
83
84    /// Get a property value.
85    pub fn property(&self, key: &str) -> Option<&Value> {
86        self.properties.get(key)
87    }
88
89    /// Set a property value.
90    pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
91        self.properties.insert(key.into(), value.into());
92    }
93
94    /// Check if the exchange has an error.
95    pub fn has_error(&self) -> bool {
96        self.error.is_some()
97    }
98
99    /// Set an error on this exchange.
100    pub fn set_error(&mut self, error: CamelError) {
101        self.error = Some(error);
102    }
103
104    /// Store a non-serializable extension value (e.g. a channel sender).
105    pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
106        self.extensions.insert(key.into(), value);
107    }
108
109    /// Retrieve a typed extension value. Returns `None` if the key is absent
110    /// or the stored value is not of type `T`.
111    pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
112        self.extensions.get(key)?.downcast_ref::<T>()
113    }
114
115    /// Deserialize the body into type `T`.
116    ///
117    /// Uses built-in conversions for `String`, `Vec<u8>`, [`bytes::Bytes`], and
118    /// `serde_json::Value`. For custom types, implement [`FromBody`] or use
119    /// [`impl_from_body_via_serde!`].
120    ///
121    /// # Example
122    /// ```rust,ignore
123    /// let text: String = exchange.body_as::<String>()?;
124    /// let raw: Vec<u8> = exchange.body_as::<Vec<u8>>()?;
125    /// ```
126    pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
127        T::from_body(&self.input.body)
128    }
129}
130
131impl Clone for Exchange {
132    fn clone(&self) -> Self {
133        Self {
134            input: self.input.clone(),
135            output: self.output.clone(),
136            properties: self.properties.clone(),
137            extensions: self.extensions.clone(), // Arc ref-count bump, cheap
138            error: self.error.clone(),
139            pattern: self.pattern,
140            correlation_id: self.correlation_id.clone(),
141            otel_context: self.otel_context.clone(),
142        }
143    }
144}
145
146impl Default for Exchange {
147    fn default() -> Self {
148        Self::new(Message::default())
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::Body;
156    use serde_json::json;
157
158    #[test]
159    fn test_exchange_new() {
160        let msg = Message::new("test");
161        let ex = Exchange::new(msg);
162        assert_eq!(ex.input.body.as_text(), Some("test"));
163        assert!(ex.output.is_none());
164        assert!(!ex.has_error());
165        assert_eq!(ex.pattern, ExchangePattern::InOnly);
166    }
167
168    #[test]
169    fn test_exchange_in_out() {
170        let ex = Exchange::new_in_out(Message::default());
171        assert_eq!(ex.pattern, ExchangePattern::InOut);
172    }
173
174    #[test]
175    fn test_exchange_properties() {
176        let mut ex = Exchange::default();
177        ex.set_property("key", Value::Bool(true));
178        assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
179        assert_eq!(ex.property("missing"), None);
180    }
181
182    #[test]
183    fn test_exchange_error() {
184        let mut ex = Exchange::default();
185        assert!(!ex.has_error());
186        ex.set_error(CamelError::ProcessorError("test".into()));
187        assert!(ex.has_error());
188    }
189
190    #[test]
191    fn test_exchange_lifecycle() {
192        let mut ex = Exchange::new(Message::new("input data"));
193        assert_eq!(ex.input.body.as_text(), Some("input data"));
194
195        // Set some properties
196        ex.set_property("processed", Value::Bool(true));
197
198        // Set output
199        ex.output = Some(Message::new("output data"));
200        assert!(ex.output.is_some());
201
202        // Verify no error
203        assert!(!ex.has_error());
204    }
205
206    #[test]
207    fn test_exchange_otel_context_default() {
208        let ex = Exchange::default();
209        // Field must exist and be accessible — compilation is the test
210        // Also verify it's a fresh context (noop span)
211        use opentelemetry::trace::TraceContextExt;
212        assert!(!ex.otel_context.span().span_context().is_valid());
213    }
214
215    #[test]
216    fn test_exchange_otel_context_propagates_in_clone() {
217        let ex = Exchange::default();
218        let cloned = ex.clone();
219        // Both should have the same (empty) context
220        use opentelemetry::trace::TraceContextExt;
221        assert!(!cloned.otel_context.span().span_context().is_valid());
222    }
223
224    #[test]
225    fn test_set_and_get_extension() {
226        use std::sync::Arc;
227        let mut ex = Exchange::default();
228        ex.set_extension("my.key", Arc::new(42u32));
229        let val: Option<&u32> = ex.get_extension("my.key");
230        assert_eq!(val, Some(&42u32));
231    }
232
233    #[test]
234    fn test_get_extension_wrong_type_returns_none() {
235        use std::sync::Arc;
236        let mut ex = Exchange::default();
237        ex.set_extension("my.key", Arc::new(42u32));
238        let val: Option<&String> = ex.get_extension("my.key");
239        assert!(val.is_none());
240    }
241
242    #[test]
243    fn test_get_extension_missing_key_returns_none() {
244        let ex = Exchange::default();
245        let val: Option<&u32> = ex.get_extension("nope");
246        assert!(val.is_none());
247    }
248
249    #[test]
250    fn test_clone_shares_extension_arc() {
251        use std::sync::Arc;
252        let mut ex = Exchange::default();
253        ex.set_extension("shared", Arc::new(99u64));
254        let cloned = ex.clone();
255        // Both see the same value
256        assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
257        assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
258    }
259
260    #[test]
261    fn test_body_as_string_from_text() {
262        let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
263
264        let result = ex.body_as::<String>();
265
266        assert_eq!(result.unwrap(), "hello");
267    }
268
269    #[test]
270    fn test_body_as_string_from_json_string() {
271        let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
272
273        let result = ex.body_as::<String>();
274
275        assert_eq!(result.unwrap(), "hello");
276    }
277
278    #[test]
279    fn test_body_as_json_value_from_json_number() {
280        let ex = Exchange::new(Message::new(Body::Json(json!(42))));
281
282        let result = ex.body_as::<serde_json::Value>();
283
284        assert_eq!(result.unwrap(), json!(42));
285    }
286
287    #[test]
288    fn test_body_as_vec_u8_from_bytes() {
289        let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
290
291        let result = ex.body_as::<Vec<u8>>();
292
293        assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
294    }
295
296    #[test]
297    fn test_body_as_string_from_empty_returns_err() {
298        let ex = Exchange::new(Message::new(Body::Empty));
299
300        let result = ex.body_as::<String>();
301
302        assert!(result.is_err());
303    }
304}