Skip to main content

camel_api/
exchange.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13/// Property key for the exception message (error Display string).
14pub const PROPERTY_EXCEPTION_MESSAGE: &str = "CamelExceptionMessage";
15/// Property key for the exception kind (error variant name via classify()).
16pub const PROPERTY_EXCEPTION_KIND: &str = "CamelExceptionKind";
17/// Property key for the caught exception (Java Camel parity alias).
18pub const PROPERTY_EXCEPTION_CAUGHT: &str = "CamelExceptionCaught";
19/// Property key set when an exception has been handled (Java Camel parity).
20pub const PROPERTY_EXCEPTION_HANDLED: &str = "CamelExceptionHandled";
21
22/// The exchange pattern (fire-and-forget or request-reply).
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
24pub enum ExchangePattern {
25    /// Fire-and-forget: message sent, no reply expected.
26    #[default]
27    InOnly,
28    /// Request-reply: message sent, reply expected.
29    InOut,
30}
31
32/// An Exchange represents a message being routed through the framework.
33///
34/// It contains the input message, an optional output message,
35/// properties for passing data between processors, and error state.
36#[derive(Debug)]
37pub struct Exchange {
38    /// The input (incoming) message.
39    pub input: Message,
40    /// The output (response) message, populated for InOut patterns.
41    pub output: Option<Message>,
42    /// Exchange-scoped properties for passing data between processors.
43    pub properties: HashMap<String, Value>,
44    /// Non-serializable extension values (e.g., channel senders).
45    /// Stored as `Arc<dyn Any + Send + Sync>` so cloning is cheap (ref-count bump).
46    pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
47    /// Error state, if processing failed.
48    pub error: Option<CamelError>,
49    /// The exchange pattern.
50    pub pattern: ExchangePattern,
51    /// Unique correlation ID for distributed tracing.
52    pub correlation_id: String,
53    /// OpenTelemetry context for distributed tracing propagation.
54    /// Carries the active span context between processing steps.
55    /// Defaults to an empty context (noop span) if OTel is not active.
56    pub otel_context: Context,
57}
58
59impl Exchange {
60    /// Create a new exchange with the given input message.
61    pub fn new(input: Message) -> Self {
62        Self {
63            input,
64            output: None,
65            properties: HashMap::new(),
66            extensions: HashMap::new(),
67            error: None,
68            pattern: ExchangePattern::default(),
69            correlation_id: Uuid::new_v4().to_string(),
70            otel_context: Context::new(),
71        }
72    }
73
74    /// Create a new exchange with the InOut pattern.
75    pub fn new_in_out(input: Message) -> Self {
76        Self {
77            input,
78            output: None,
79            properties: HashMap::new(),
80            extensions: HashMap::new(),
81            error: None,
82            pattern: ExchangePattern::InOut,
83            correlation_id: Uuid::new_v4().to_string(),
84            otel_context: Context::new(),
85        }
86    }
87
88    /// Get the correlation ID for this exchange.
89    pub fn correlation_id(&self) -> &str {
90        &self.correlation_id
91    }
92
93    /// Get a property value.
94    pub fn property(&self, key: &str) -> Option<&Value> {
95        self.properties.get(key)
96    }
97
98    /// Set a property value.
99    pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
100        self.properties.insert(key.into(), value.into());
101    }
102
103    /// Check if the exchange has an error.
104    pub fn has_error(&self) -> bool {
105        self.error.is_some()
106    }
107
108    /// Set an error on this exchange.
109    ///
110    /// Automatically populates exchange properties with error context so that
111    /// all languages can access error information via their property mechanisms:
112    /// - `CamelExceptionMessage` — the error's Display string
113    /// - `CamelExceptionKind` — the error variant name (via `CamelError::classify()`)
114    /// - `CamelExceptionCaught` — alias for Java Camel parity
115    pub fn set_error(&mut self, error: CamelError) {
116        let msg = error.to_string();
117        let kind = error.classify().to_string();
118        self.properties.insert(
119            PROPERTY_EXCEPTION_MESSAGE.to_string(),
120            Value::String(msg.clone()),
121        );
122        self.properties
123            .insert(PROPERTY_EXCEPTION_KIND.to_string(), Value::String(kind));
124        self.properties
125            .insert(PROPERTY_EXCEPTION_CAUGHT.to_string(), Value::String(msg));
126        self.error = Some(error);
127    }
128
129    /// Clear the error and remove all exception properties.
130    ///
131    /// Called after `handled:true` in on_exception steps to prevent stale
132    /// error state from leaking into subsequent processing.
133    pub fn clear_error(&mut self) {
134        self.error = None;
135        self.properties.remove(PROPERTY_EXCEPTION_MESSAGE);
136        self.properties.remove(PROPERTY_EXCEPTION_KIND);
137        self.properties.remove(PROPERTY_EXCEPTION_CAUGHT);
138    }
139
140    /// Mark the exception as handled and clear the error.
141    ///
142    /// Sets `CamelExceptionHandled = true` then calls `clear_error()`.
143    /// This matches Java Camel's `Exchange.EXCEPTION_HANDLED` semantics.
144    pub fn handle_error(&mut self) {
145        self.properties
146            .insert(PROPERTY_EXCEPTION_HANDLED.to_string(), Value::Bool(true));
147        self.clear_error();
148    }
149
150    /// Store a non-serializable extension value (e.g. a channel sender).
151    pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
152        self.extensions.insert(key.into(), value);
153    }
154
155    /// Retrieve a typed extension value. Returns `None` if the key is absent
156    /// or the stored value is not of type `T`.
157    pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
158        self.extensions.get(key)?.downcast_ref::<T>()
159    }
160
161    /// Deserialize the body into type `T`.
162    ///
163    /// Uses built-in conversions for `String`, `Vec<u8>`, [`bytes::Bytes`], and
164    /// `serde_json::Value`. For custom types, implement [`FromBody`] or use
165    /// [`impl_from_body_via_serde!`].
166    ///
167    /// # Example
168    /// ```rust,ignore
169    /// let text: String = exchange.body_as::<String>()?;
170    /// let raw: Vec<u8> = exchange.body_as::<Vec<u8>>()?;
171    /// ```
172    pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
173        T::from_body(&self.input.body)
174    }
175}
176
177impl Clone for Exchange {
178    fn clone(&self) -> Self {
179        Self {
180            input: self.input.clone(),
181            output: self.output.clone(),
182            properties: self.properties.clone(),
183            extensions: self.extensions.clone(), // Arc ref-count bump, cheap
184            error: self.error.clone(),
185            pattern: self.pattern,
186            correlation_id: self.correlation_id.clone(),
187            otel_context: self.otel_context.clone(),
188        }
189    }
190}
191
192impl Default for Exchange {
193    fn default() -> Self {
194        Self::new(Message::default())
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::Body;
202    use serde_json::json;
203
204    #[test]
205    fn test_exchange_new() {
206        let msg = Message::new("test");
207        let ex = Exchange::new(msg);
208        assert_eq!(ex.input.body.as_text(), Some("test"));
209        assert!(ex.output.is_none());
210        assert!(!ex.has_error());
211        assert_eq!(ex.pattern, ExchangePattern::InOnly);
212    }
213
214    #[test]
215    fn test_exchange_in_out() {
216        let ex = Exchange::new_in_out(Message::default());
217        assert_eq!(ex.pattern, ExchangePattern::InOut);
218    }
219
220    #[test]
221    fn test_exchange_properties() {
222        let mut ex = Exchange::default();
223        ex.set_property("key", Value::Bool(true));
224        assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
225        assert_eq!(ex.property("missing"), None);
226    }
227
228    #[test]
229    fn test_exchange_error() {
230        let mut ex = Exchange::default();
231        assert!(!ex.has_error());
232        ex.set_error(CamelError::ProcessorError("test".into()));
233        assert!(ex.has_error());
234    }
235
236    #[test]
237    fn test_set_error_populates_properties() {
238        let mut ex = Exchange::default();
239        ex.set_error(CamelError::ProcessorError("boom".into()));
240
241        assert!(ex.has_error());
242        assert_eq!(
243            ex.properties.get(PROPERTY_EXCEPTION_MESSAGE),
244            Some(&Value::String("Processor error: boom".to_string()))
245        );
246        assert_eq!(
247            ex.properties.get(PROPERTY_EXCEPTION_KIND),
248            Some(&Value::String("processor".to_string()))
249        );
250        assert_eq!(
251            ex.properties.get(PROPERTY_EXCEPTION_CAUGHT),
252            Some(&Value::String("Processor error: boom".to_string()))
253        );
254    }
255
256    #[test]
257    fn test_clear_error_removes_properties() {
258        let mut ex = Exchange::default();
259        ex.set_error(CamelError::RouteError("fail".into()));
260        assert!(ex.has_error());
261        assert!(ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
262
263        ex.clear_error();
264
265        assert!(!ex.has_error());
266        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
267        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_KIND));
268        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_CAUGHT));
269    }
270
271    #[test]
272    fn test_exchange_lifecycle() {
273        let mut ex = Exchange::new(Message::new("input data"));
274        assert_eq!(ex.input.body.as_text(), Some("input data"));
275
276        // Set some properties
277        ex.set_property("processed", Value::Bool(true));
278
279        // Set output
280        ex.output = Some(Message::new("output data"));
281        assert!(ex.output.is_some());
282
283        // Verify no error
284        assert!(!ex.has_error());
285    }
286
287    #[test]
288    fn test_exchange_otel_context_default() {
289        let ex = Exchange::default();
290        // Field must exist and be accessible — compilation is the test
291        // Also verify it's a fresh context (noop span)
292        use opentelemetry::trace::TraceContextExt;
293        assert!(!ex.otel_context.span().span_context().is_valid());
294    }
295
296    #[test]
297    fn test_exchange_otel_context_propagates_in_clone() {
298        let ex = Exchange::default();
299        let cloned = ex.clone();
300        // Both should have the same (empty) context
301        use opentelemetry::trace::TraceContextExt;
302        assert!(!cloned.otel_context.span().span_context().is_valid());
303    }
304
305    #[test]
306    fn test_set_and_get_extension() {
307        use std::sync::Arc;
308        let mut ex = Exchange::default();
309        ex.set_extension("my.key", Arc::new(42u32));
310        let val: Option<&u32> = ex.get_extension("my.key");
311        assert_eq!(val, Some(&42u32));
312    }
313
314    #[test]
315    fn test_get_extension_wrong_type_returns_none() {
316        use std::sync::Arc;
317        let mut ex = Exchange::default();
318        ex.set_extension("my.key", Arc::new(42u32));
319        let val: Option<&String> = ex.get_extension("my.key");
320        assert!(val.is_none());
321    }
322
323    #[test]
324    fn test_get_extension_missing_key_returns_none() {
325        let ex = Exchange::default();
326        let val: Option<&u32> = ex.get_extension("nope");
327        assert!(val.is_none());
328    }
329
330    #[test]
331    fn test_clone_shares_extension_arc() {
332        use std::sync::Arc;
333        let mut ex = Exchange::default();
334        ex.set_extension("shared", Arc::new(99u64));
335        let cloned = ex.clone();
336        // Both see the same value
337        assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
338        assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
339    }
340
341    #[test]
342    fn test_body_as_string_from_text() {
343        let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
344
345        let result = ex.body_as::<String>();
346
347        assert_eq!(result.unwrap(), "hello");
348    }
349
350    #[test]
351    fn test_body_as_string_from_json_string() {
352        let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
353
354        let result = ex.body_as::<String>();
355
356        assert_eq!(result.unwrap(), "hello");
357    }
358
359    #[test]
360    fn test_body_as_json_value_from_json_number() {
361        let ex = Exchange::new(Message::new(Body::Json(json!(42))));
362
363        let result = ex.body_as::<serde_json::Value>();
364
365        assert_eq!(result.unwrap(), json!(42));
366    }
367
368    #[test]
369    fn test_body_as_vec_u8_from_bytes() {
370        let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
371
372        let result = ex.body_as::<Vec<u8>>();
373
374        assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
375    }
376
377    #[test]
378    fn test_body_as_string_from_empty_returns_err() {
379        let ex = Exchange::new(Message::new(Body::Empty));
380
381        let result = ex.body_as::<String>();
382
383        assert!(result.is_err());
384    }
385}