Skip to main content

camel_api/
exchange.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13/// Extension key used by use_original_message: stashes the pre-route Message
14/// so the error handler can restore body+headers before DLC dispatch.
15pub const ORIGINAL_MESSAGE_EXTENSION: &str = "CamelOriginalMessage";
16
17/// Pipeline-executor-recognized stop signal. Set by processors (ThrottleStrategy::Drop,
18/// SamplingService) that cannot return PipelineOutcome::Stopped directly because they
19/// are Tower Service<Exchange> (Process mode), not OutcomePipeline (Segment).
20/// The executor checks this after each step completion. See ADR-0024 amendment.
21pub const CAMEL_STOP: &str = "CamelStop";
22
23/// Check whether the exchange carries the CamelStop signal.
24///
25/// Returns `true` if the `CamelStop` property is set to a boolean `true` value.
26/// Used by the pipeline executor (`run_steps`) and segment adapters to detect
27/// Process-mode processors that cannot return `PipelineOutcome::Stopped` directly.
28pub fn is_camel_stop(exchange: &Exchange) -> bool {
29    exchange
30        .property(CAMEL_STOP)
31        .and_then(|v| v.as_bool())
32        .unwrap_or(false)
33}
34
35/// Property key for the exception message (error Display string).
36pub const PROPERTY_EXCEPTION_MESSAGE: &str = "CamelExceptionMessage";
37/// Property key for the exception kind (error variant name via classify()).
38pub const PROPERTY_EXCEPTION_KIND: &str = "CamelExceptionKind";
39/// Property key for the caught exception (Java Camel parity alias).
40pub const PROPERTY_EXCEPTION_CAUGHT: &str = "CamelExceptionCaught";
41/// Property key set when an exception has been handled (Java Camel parity).
42pub const PROPERTY_EXCEPTION_HANDLED: &str = "CamelExceptionHandled";
43
44/// The exchange pattern (fire-and-forget or request-reply).
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum ExchangePattern {
47    /// Fire-and-forget: message sent, no reply expected.
48    #[default]
49    InOnly,
50    /// Request-reply: message sent, reply expected.
51    InOut,
52}
53
54/// An Exchange represents a message being routed through the framework.
55///
56/// It contains the input message, an optional output message,
57/// properties for passing data between processors, and error state.
58#[derive(Debug)]
59pub struct Exchange {
60    /// The input (incoming) message.
61    pub input: Message,
62    /// The output (response) message, populated for InOut patterns.
63    pub output: Option<Message>,
64    /// Exchange-scoped properties for passing data between processors.
65    pub properties: HashMap<String, Value>,
66    /// Non-serializable extension values (e.g., channel senders).
67    /// Stored as `Arc<dyn Any + Send + Sync>` so cloning is cheap (ref-count bump).
68    pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
69    /// Error state, if processing failed.
70    pub error: Option<CamelError>,
71    /// The exchange pattern.
72    pub pattern: ExchangePattern,
73    /// Unique correlation ID for distributed tracing.
74    pub correlation_id: String,
75    /// OpenTelemetry context for distributed tracing propagation.
76    /// Carries the active span context between processing steps.
77    /// Defaults to an empty context (noop span) if OTel is not active.
78    pub otel_context: Context,
79}
80
81impl Exchange {
82    /// Create a new exchange with the given input message.
83    pub fn new(input: Message) -> Self {
84        Self {
85            input,
86            output: None,
87            properties: HashMap::new(),
88            extensions: HashMap::new(),
89            error: None,
90            pattern: ExchangePattern::default(),
91            correlation_id: Uuid::new_v4().to_string(),
92            otel_context: Context::new(),
93        }
94    }
95
96    /// Create a new exchange with the InOut pattern.
97    pub fn new_in_out(input: Message) -> Self {
98        Self {
99            input,
100            output: None,
101            properties: HashMap::new(),
102            extensions: HashMap::new(),
103            error: None,
104            pattern: ExchangePattern::InOut,
105            correlation_id: Uuid::new_v4().to_string(),
106            otel_context: Context::new(),
107        }
108    }
109
110    /// Get the correlation ID for this exchange.
111    pub fn correlation_id(&self) -> &str {
112        &self.correlation_id
113    }
114
115    /// Get a property value.
116    pub fn property(&self, key: &str) -> Option<&Value> {
117        self.properties.get(key)
118    }
119
120    /// Set a property value.
121    pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
122        self.properties.insert(key.into(), value.into());
123    }
124
125    /// Check if the exchange has an error.
126    pub fn has_error(&self) -> bool {
127        self.error.is_some()
128    }
129
130    /// Set an error on this exchange.
131    ///
132    /// Automatically populates exchange properties with error context so that
133    /// all languages can access error information via their property mechanisms:
134    /// - `CamelExceptionMessage` — the error's Display string
135    /// - `CamelExceptionKind` — the error variant name (via `CamelError::classify()`)
136    /// - `CamelExceptionCaught` — alias for Java Camel parity
137    pub fn set_error(&mut self, error: CamelError) {
138        let msg = error.to_string();
139        let kind = error.classify().to_string();
140        self.properties.insert(
141            PROPERTY_EXCEPTION_MESSAGE.to_string(),
142            Value::String(msg.clone()),
143        );
144        self.properties
145            .insert(PROPERTY_EXCEPTION_KIND.to_string(), Value::String(kind));
146        self.properties
147            .insert(PROPERTY_EXCEPTION_CAUGHT.to_string(), Value::String(msg));
148        self.error = Some(error);
149    }
150
151    /// Clear the error and remove all exception properties.
152    ///
153    /// Called after `handled:true` in on_exception steps to prevent stale
154    /// error state from leaking into subsequent processing.
155    pub fn clear_error(&mut self) {
156        self.error = None;
157        self.properties.remove(PROPERTY_EXCEPTION_MESSAGE);
158        self.properties.remove(PROPERTY_EXCEPTION_KIND);
159        self.properties.remove(PROPERTY_EXCEPTION_CAUGHT);
160    }
161
162    /// Mark the exception as handled and clear the error.
163    ///
164    /// Sets `CamelExceptionHandled = true` then calls `clear_error()`.
165    /// This matches Java Camel's `Exchange.EXCEPTION_HANDLED` semantics.
166    pub fn handle_error(&mut self) {
167        self.properties
168            .insert(PROPERTY_EXCEPTION_HANDLED.to_string(), Value::Bool(true));
169        self.clear_error();
170    }
171
172    /// Store a non-serializable extension value (e.g. a channel sender).
173    pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
174        self.extensions.insert(key.into(), value);
175    }
176
177    /// Retrieve a typed extension value. Returns `None` if the key is absent
178    /// or the stored value is not of type `T`.
179    pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
180        self.extensions.get(key)?.downcast_ref::<T>()
181    }
182
183    /// Deserialize the body into type `T`.
184    ///
185    /// Uses built-in conversions for `String`, `Vec<u8>`, [`bytes::Bytes`], and
186    /// `serde_json::Value`. For custom types, implement [`FromBody`] or use
187    /// [`impl_from_body_via_serde!`].
188    ///
189    /// # Example
190    /// ```rust,ignore
191    /// let text: String = exchange.body_as::<String>()?;
192    /// let raw: Vec<u8> = exchange.body_as::<Vec<u8>>()?;
193    /// ```
194    pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
195        T::from_body(&self.input.body)
196    }
197}
198
199impl Clone for Exchange {
200    fn clone(&self) -> Self {
201        Self {
202            input: self.input.clone(),
203            output: self.output.clone(),
204            properties: self.properties.clone(),
205            extensions: self.extensions.clone(), // Arc ref-count bump, cheap
206            error: self.error.clone(),
207            pattern: self.pattern,
208            correlation_id: self.correlation_id.clone(),
209            otel_context: self.otel_context.clone(),
210        }
211    }
212}
213
214impl Default for Exchange {
215    fn default() -> Self {
216        Self::new(Message::default())
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::Body;
224    use serde_json::json;
225
226    #[test]
227    fn test_exchange_new() {
228        let msg = Message::new("test");
229        let ex = Exchange::new(msg);
230        assert_eq!(ex.input.body.as_text(), Some("test"));
231        assert!(ex.output.is_none());
232        assert!(!ex.has_error());
233        assert_eq!(ex.pattern, ExchangePattern::InOnly);
234    }
235
236    #[test]
237    fn test_exchange_in_out() {
238        let ex = Exchange::new_in_out(Message::default());
239        assert_eq!(ex.pattern, ExchangePattern::InOut);
240    }
241
242    #[test]
243    fn test_exchange_properties() {
244        let mut ex = Exchange::default();
245        ex.set_property("key", Value::Bool(true));
246        assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
247        assert_eq!(ex.property("missing"), None);
248    }
249
250    #[test]
251    fn test_exchange_error() {
252        let mut ex = Exchange::default();
253        assert!(!ex.has_error());
254        ex.set_error(CamelError::ProcessorError("test".into()));
255        assert!(ex.has_error());
256    }
257
258    #[test]
259    fn test_set_error_populates_properties() {
260        let mut ex = Exchange::default();
261        ex.set_error(CamelError::ProcessorError("boom".into()));
262
263        assert!(ex.has_error());
264        assert_eq!(
265            ex.properties.get(PROPERTY_EXCEPTION_MESSAGE),
266            Some(&Value::String("Processor error: boom".to_string()))
267        );
268        assert_eq!(
269            ex.properties.get(PROPERTY_EXCEPTION_KIND),
270            Some(&Value::String("processor".to_string()))
271        );
272        assert_eq!(
273            ex.properties.get(PROPERTY_EXCEPTION_CAUGHT),
274            Some(&Value::String("Processor error: boom".to_string()))
275        );
276    }
277
278    #[test]
279    fn test_clear_error_removes_properties() {
280        let mut ex = Exchange::default();
281        ex.set_error(CamelError::RouteError("fail".into()));
282        assert!(ex.has_error());
283        assert!(ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
284
285        ex.clear_error();
286
287        assert!(!ex.has_error());
288        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
289        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_KIND));
290        assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_CAUGHT));
291    }
292
293    #[test]
294    fn test_exchange_lifecycle() {
295        let mut ex = Exchange::new(Message::new("input data"));
296        assert_eq!(ex.input.body.as_text(), Some("input data"));
297
298        // Set some properties
299        ex.set_property("processed", Value::Bool(true));
300
301        // Set output
302        ex.output = Some(Message::new("output data"));
303        assert!(ex.output.is_some());
304
305        // Verify no error
306        assert!(!ex.has_error());
307    }
308
309    #[test]
310    fn test_exchange_otel_context_default() {
311        let ex = Exchange::default();
312        // Field must exist and be accessible — compilation is the test
313        // Also verify it's a fresh context (noop span)
314        use opentelemetry::trace::TraceContextExt;
315        assert!(!ex.otel_context.span().span_context().is_valid());
316    }
317
318    #[test]
319    fn test_exchange_otel_context_propagates_in_clone() {
320        let ex = Exchange::default();
321        let cloned = ex.clone();
322        // Both should have the same (empty) context
323        use opentelemetry::trace::TraceContextExt;
324        assert!(!cloned.otel_context.span().span_context().is_valid());
325    }
326
327    #[test]
328    fn test_set_and_get_extension() {
329        use std::sync::Arc;
330        let mut ex = Exchange::default();
331        ex.set_extension("my.key", Arc::new(42u32));
332        let val: Option<&u32> = ex.get_extension("my.key");
333        assert_eq!(val, Some(&42u32));
334    }
335
336    #[test]
337    fn test_get_extension_wrong_type_returns_none() {
338        use std::sync::Arc;
339        let mut ex = Exchange::default();
340        ex.set_extension("my.key", Arc::new(42u32));
341        let val: Option<&String> = ex.get_extension("my.key");
342        assert!(val.is_none());
343    }
344
345    #[test]
346    fn test_get_extension_missing_key_returns_none() {
347        let ex = Exchange::default();
348        let val: Option<&u32> = ex.get_extension("nope");
349        assert!(val.is_none());
350    }
351
352    #[test]
353    fn test_clone_shares_extension_arc() {
354        use std::sync::Arc;
355        let mut ex = Exchange::default();
356        ex.set_extension("shared", Arc::new(99u64));
357        let cloned = ex.clone();
358        // Both see the same value
359        assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
360        assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
361    }
362
363    #[test]
364    fn test_body_as_string_from_text() {
365        let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
366
367        let result = ex.body_as::<String>();
368
369        assert_eq!(result.unwrap(), "hello");
370    }
371
372    #[test]
373    fn test_body_as_string_from_json_string() {
374        let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
375
376        let result = ex.body_as::<String>();
377
378        assert_eq!(result.unwrap(), "hello");
379    }
380
381    #[test]
382    fn test_body_as_json_value_from_json_number() {
383        let ex = Exchange::new(Message::new(Body::Json(json!(42))));
384
385        let result = ex.body_as::<serde_json::Value>();
386
387        assert_eq!(result.unwrap(), json!(42));
388    }
389
390    #[test]
391    fn test_body_as_vec_u8_from_bytes() {
392        let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
393
394        let result = ex.body_as::<Vec<u8>>();
395
396        assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
397    }
398
399    #[test]
400    fn test_body_as_string_from_empty_returns_err() {
401        let ex = Exchange::new(Message::new(Body::Empty));
402
403        let result = ex.body_as::<String>();
404
405        assert!(result.is_err());
406    }
407}