Skip to main content

camel_api/
exchange.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::message::Message;
10use crate::value::Value;
11
12/// The exchange pattern (fire-and-forget or request-reply).
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum ExchangePattern {
15    /// Fire-and-forget: message sent, no reply expected.
16    #[default]
17    InOnly,
18    /// Request-reply: message sent, reply expected.
19    InOut,
20}
21
22/// An Exchange represents a message being routed through the framework.
23///
24/// It contains the input message, an optional output message,
25/// properties for passing data between processors, and error state.
26#[derive(Debug)]
27pub struct Exchange {
28    /// The input (incoming) message.
29    pub input: Message,
30    /// The output (response) message, populated for InOut patterns.
31    pub output: Option<Message>,
32    /// Exchange-scoped properties for passing data between processors.
33    pub properties: HashMap<String, Value>,
34    /// Non-serializable extension values (e.g., channel senders).
35    /// Stored as `Arc<dyn Any + Send + Sync>` so cloning is cheap (ref-count bump).
36    pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
37    /// Error state, if processing failed.
38    pub error: Option<CamelError>,
39    /// The exchange pattern.
40    pub pattern: ExchangePattern,
41    /// Unique correlation ID for distributed tracing.
42    pub correlation_id: String,
43    /// OpenTelemetry context for distributed tracing propagation.
44    /// Carries the active span context between processing steps.
45    /// Defaults to an empty context (noop span) if OTel is not active.
46    pub otel_context: Context,
47}
48
49impl Exchange {
50    /// Create a new exchange with the given input message.
51    pub fn new(input: Message) -> Self {
52        Self {
53            input,
54            output: None,
55            properties: HashMap::new(),
56            extensions: HashMap::new(),
57            error: None,
58            pattern: ExchangePattern::default(),
59            correlation_id: Uuid::new_v4().to_string(),
60            otel_context: Context::new(),
61        }
62    }
63
64    /// Create a new exchange with the InOut pattern.
65    pub fn new_in_out(input: Message) -> Self {
66        Self {
67            input,
68            output: None,
69            properties: HashMap::new(),
70            extensions: HashMap::new(),
71            error: None,
72            pattern: ExchangePattern::InOut,
73            correlation_id: Uuid::new_v4().to_string(),
74            otel_context: Context::new(),
75        }
76    }
77
78    /// Get the correlation ID for this exchange.
79    pub fn correlation_id(&self) -> &str {
80        &self.correlation_id
81    }
82
83    /// Get a property value.
84    pub fn property(&self, key: &str) -> Option<&Value> {
85        self.properties.get(key)
86    }
87
88    /// Set a property value.
89    pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
90        self.properties.insert(key.into(), value.into());
91    }
92
93    /// Check if the exchange has an error.
94    pub fn has_error(&self) -> bool {
95        self.error.is_some()
96    }
97
98    /// Set an error on this exchange.
99    pub fn set_error(&mut self, error: CamelError) {
100        self.error = Some(error);
101    }
102
103    /// Store a non-serializable extension value (e.g. a channel sender).
104    pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
105        self.extensions.insert(key.into(), value);
106    }
107
108    /// Retrieve a typed extension value. Returns `None` if the key is absent
109    /// or the stored value is not of type `T`.
110    pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
111        self.extensions.get(key)?.downcast_ref::<T>()
112    }
113}
114
115impl Clone for Exchange {
116    fn clone(&self) -> Self {
117        Self {
118            input: self.input.clone(),
119            output: self.output.clone(),
120            properties: self.properties.clone(),
121            extensions: self.extensions.clone(), // Arc ref-count bump, cheap
122            error: self.error.clone(),
123            pattern: self.pattern,
124            correlation_id: self.correlation_id.clone(),
125            otel_context: self.otel_context.clone(),
126        }
127    }
128}
129
130impl Default for Exchange {
131    fn default() -> Self {
132        Self::new(Message::default())
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_exchange_new() {
142        let msg = Message::new("test");
143        let ex = Exchange::new(msg);
144        assert_eq!(ex.input.body.as_text(), Some("test"));
145        assert!(ex.output.is_none());
146        assert!(!ex.has_error());
147        assert_eq!(ex.pattern, ExchangePattern::InOnly);
148    }
149
150    #[test]
151    fn test_exchange_in_out() {
152        let ex = Exchange::new_in_out(Message::default());
153        assert_eq!(ex.pattern, ExchangePattern::InOut);
154    }
155
156    #[test]
157    fn test_exchange_properties() {
158        let mut ex = Exchange::default();
159        ex.set_property("key", Value::Bool(true));
160        assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
161        assert_eq!(ex.property("missing"), None);
162    }
163
164    #[test]
165    fn test_exchange_error() {
166        let mut ex = Exchange::default();
167        assert!(!ex.has_error());
168        ex.set_error(CamelError::ProcessorError("test".into()));
169        assert!(ex.has_error());
170    }
171
172    #[test]
173    fn test_exchange_lifecycle() {
174        let mut ex = Exchange::new(Message::new("input data"));
175        assert_eq!(ex.input.body.as_text(), Some("input data"));
176
177        // Set some properties
178        ex.set_property("processed", Value::Bool(true));
179
180        // Set output
181        ex.output = Some(Message::new("output data"));
182        assert!(ex.output.is_some());
183
184        // Verify no error
185        assert!(!ex.has_error());
186    }
187
188    #[test]
189    fn test_exchange_otel_context_default() {
190        let ex = Exchange::default();
191        // Field must exist and be accessible — compilation is the test
192        // Also verify it's a fresh context (noop span)
193        use opentelemetry::trace::TraceContextExt;
194        assert!(!ex.otel_context.span().span_context().is_valid());
195    }
196
197    #[test]
198    fn test_exchange_otel_context_propagates_in_clone() {
199        let ex = Exchange::default();
200        let cloned = ex.clone();
201        // Both should have the same (empty) context
202        use opentelemetry::trace::TraceContextExt;
203        assert!(!cloned.otel_context.span().span_context().is_valid());
204    }
205
206    #[test]
207    fn test_set_and_get_extension() {
208        use std::sync::Arc;
209        let mut ex = Exchange::default();
210        ex.set_extension("my.key", Arc::new(42u32));
211        let val: Option<&u32> = ex.get_extension("my.key");
212        assert_eq!(val, Some(&42u32));
213    }
214
215    #[test]
216    fn test_get_extension_wrong_type_returns_none() {
217        use std::sync::Arc;
218        let mut ex = Exchange::default();
219        ex.set_extension("my.key", Arc::new(42u32));
220        let val: Option<&String> = ex.get_extension("my.key");
221        assert!(val.is_none());
222    }
223
224    #[test]
225    fn test_get_extension_missing_key_returns_none() {
226        let ex = Exchange::default();
227        let val: Option<&u32> = ex.get_extension("nope");
228        assert!(val.is_none());
229    }
230
231    #[test]
232    fn test_clone_shares_extension_arc() {
233        use std::sync::Arc;
234        let mut ex = Exchange::default();
235        ex.set_extension("shared", Arc::new(99u64));
236        let cloned = ex.clone();
237        // Both see the same value
238        assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
239        assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
240    }
241}