1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13pub const PROPERTY_EXCEPTION_MESSAGE: &str = "CamelExceptionMessage";
15pub const PROPERTY_EXCEPTION_KIND: &str = "CamelExceptionKind";
17pub const PROPERTY_EXCEPTION_CAUGHT: &str = "CamelExceptionCaught";
19pub const PROPERTY_EXCEPTION_HANDLED: &str = "CamelExceptionHandled";
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
24pub enum ExchangePattern {
25 #[default]
27 InOnly,
28 InOut,
30}
31
32#[derive(Debug)]
37pub struct Exchange {
38 pub input: Message,
40 pub output: Option<Message>,
42 pub properties: HashMap<String, Value>,
44 pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
47 pub error: Option<CamelError>,
49 pub pattern: ExchangePattern,
51 pub correlation_id: String,
53 pub otel_context: Context,
57}
58
59impl Exchange {
60 pub fn new(input: Message) -> Self {
62 Self {
63 input,
64 output: None,
65 properties: HashMap::new(),
66 extensions: HashMap::new(),
67 error: None,
68 pattern: ExchangePattern::default(),
69 correlation_id: Uuid::new_v4().to_string(),
70 otel_context: Context::new(),
71 }
72 }
73
74 pub fn new_in_out(input: Message) -> Self {
76 Self {
77 input,
78 output: None,
79 properties: HashMap::new(),
80 extensions: HashMap::new(),
81 error: None,
82 pattern: ExchangePattern::InOut,
83 correlation_id: Uuid::new_v4().to_string(),
84 otel_context: Context::new(),
85 }
86 }
87
88 pub fn correlation_id(&self) -> &str {
90 &self.correlation_id
91 }
92
93 pub fn property(&self, key: &str) -> Option<&Value> {
95 self.properties.get(key)
96 }
97
98 pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
100 self.properties.insert(key.into(), value.into());
101 }
102
103 pub fn has_error(&self) -> bool {
105 self.error.is_some()
106 }
107
108 pub fn set_error(&mut self, error: CamelError) {
116 let msg = error.to_string();
117 let kind = error.classify().to_string();
118 self.properties.insert(
119 PROPERTY_EXCEPTION_MESSAGE.to_string(),
120 Value::String(msg.clone()),
121 );
122 self.properties
123 .insert(PROPERTY_EXCEPTION_KIND.to_string(), Value::String(kind));
124 self.properties
125 .insert(PROPERTY_EXCEPTION_CAUGHT.to_string(), Value::String(msg));
126 self.error = Some(error);
127 }
128
129 pub fn clear_error(&mut self) {
134 self.error = None;
135 self.properties.remove(PROPERTY_EXCEPTION_MESSAGE);
136 self.properties.remove(PROPERTY_EXCEPTION_KIND);
137 self.properties.remove(PROPERTY_EXCEPTION_CAUGHT);
138 }
139
140 pub fn handle_error(&mut self) {
145 self.properties
146 .insert(PROPERTY_EXCEPTION_HANDLED.to_string(), Value::Bool(true));
147 self.clear_error();
148 }
149
150 pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
152 self.extensions.insert(key.into(), value);
153 }
154
155 pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
158 self.extensions.get(key)?.downcast_ref::<T>()
159 }
160
161 pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
173 T::from_body(&self.input.body)
174 }
175}
176
177impl Clone for Exchange {
178 fn clone(&self) -> Self {
179 Self {
180 input: self.input.clone(),
181 output: self.output.clone(),
182 properties: self.properties.clone(),
183 extensions: self.extensions.clone(), error: self.error.clone(),
185 pattern: self.pattern,
186 correlation_id: self.correlation_id.clone(),
187 otel_context: self.otel_context.clone(),
188 }
189 }
190}
191
192impl Default for Exchange {
193 fn default() -> Self {
194 Self::new(Message::default())
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::Body;
202 use serde_json::json;
203
204 #[test]
205 fn test_exchange_new() {
206 let msg = Message::new("test");
207 let ex = Exchange::new(msg);
208 assert_eq!(ex.input.body.as_text(), Some("test"));
209 assert!(ex.output.is_none());
210 assert!(!ex.has_error());
211 assert_eq!(ex.pattern, ExchangePattern::InOnly);
212 }
213
214 #[test]
215 fn test_exchange_in_out() {
216 let ex = Exchange::new_in_out(Message::default());
217 assert_eq!(ex.pattern, ExchangePattern::InOut);
218 }
219
220 #[test]
221 fn test_exchange_properties() {
222 let mut ex = Exchange::default();
223 ex.set_property("key", Value::Bool(true));
224 assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
225 assert_eq!(ex.property("missing"), None);
226 }
227
228 #[test]
229 fn test_exchange_error() {
230 let mut ex = Exchange::default();
231 assert!(!ex.has_error());
232 ex.set_error(CamelError::ProcessorError("test".into()));
233 assert!(ex.has_error());
234 }
235
236 #[test]
237 fn test_set_error_populates_properties() {
238 let mut ex = Exchange::default();
239 ex.set_error(CamelError::ProcessorError("boom".into()));
240
241 assert!(ex.has_error());
242 assert_eq!(
243 ex.properties.get(PROPERTY_EXCEPTION_MESSAGE),
244 Some(&Value::String("Processor error: boom".to_string()))
245 );
246 assert_eq!(
247 ex.properties.get(PROPERTY_EXCEPTION_KIND),
248 Some(&Value::String("processor".to_string()))
249 );
250 assert_eq!(
251 ex.properties.get(PROPERTY_EXCEPTION_CAUGHT),
252 Some(&Value::String("Processor error: boom".to_string()))
253 );
254 }
255
256 #[test]
257 fn test_clear_error_removes_properties() {
258 let mut ex = Exchange::default();
259 ex.set_error(CamelError::RouteError("fail".into()));
260 assert!(ex.has_error());
261 assert!(ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
262
263 ex.clear_error();
264
265 assert!(!ex.has_error());
266 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
267 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_KIND));
268 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_CAUGHT));
269 }
270
271 #[test]
272 fn test_exchange_lifecycle() {
273 let mut ex = Exchange::new(Message::new("input data"));
274 assert_eq!(ex.input.body.as_text(), Some("input data"));
275
276 ex.set_property("processed", Value::Bool(true));
278
279 ex.output = Some(Message::new("output data"));
281 assert!(ex.output.is_some());
282
283 assert!(!ex.has_error());
285 }
286
287 #[test]
288 fn test_exchange_otel_context_default() {
289 let ex = Exchange::default();
290 use opentelemetry::trace::TraceContextExt;
293 assert!(!ex.otel_context.span().span_context().is_valid());
294 }
295
296 #[test]
297 fn test_exchange_otel_context_propagates_in_clone() {
298 let ex = Exchange::default();
299 let cloned = ex.clone();
300 use opentelemetry::trace::TraceContextExt;
302 assert!(!cloned.otel_context.span().span_context().is_valid());
303 }
304
305 #[test]
306 fn test_set_and_get_extension() {
307 use std::sync::Arc;
308 let mut ex = Exchange::default();
309 ex.set_extension("my.key", Arc::new(42u32));
310 let val: Option<&u32> = ex.get_extension("my.key");
311 assert_eq!(val, Some(&42u32));
312 }
313
314 #[test]
315 fn test_get_extension_wrong_type_returns_none() {
316 use std::sync::Arc;
317 let mut ex = Exchange::default();
318 ex.set_extension("my.key", Arc::new(42u32));
319 let val: Option<&String> = ex.get_extension("my.key");
320 assert!(val.is_none());
321 }
322
323 #[test]
324 fn test_get_extension_missing_key_returns_none() {
325 let ex = Exchange::default();
326 let val: Option<&u32> = ex.get_extension("nope");
327 assert!(val.is_none());
328 }
329
330 #[test]
331 fn test_clone_shares_extension_arc() {
332 use std::sync::Arc;
333 let mut ex = Exchange::default();
334 ex.set_extension("shared", Arc::new(99u64));
335 let cloned = ex.clone();
336 assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
338 assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
339 }
340
341 #[test]
342 fn test_body_as_string_from_text() {
343 let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
344
345 let result = ex.body_as::<String>();
346
347 assert_eq!(result.unwrap(), "hello");
348 }
349
350 #[test]
351 fn test_body_as_string_from_json_string() {
352 let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
353
354 let result = ex.body_as::<String>();
355
356 assert_eq!(result.unwrap(), "hello");
357 }
358
359 #[test]
360 fn test_body_as_json_value_from_json_number() {
361 let ex = Exchange::new(Message::new(Body::Json(json!(42))));
362
363 let result = ex.body_as::<serde_json::Value>();
364
365 assert_eq!(result.unwrap(), json!(42));
366 }
367
368 #[test]
369 fn test_body_as_vec_u8_from_bytes() {
370 let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
371
372 let result = ex.body_as::<Vec<u8>>();
373
374 assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
375 }
376
377 #[test]
378 fn test_body_as_string_from_empty_returns_err() {
379 let ex = Exchange::new(Message::new(Body::Empty));
380
381 let result = ex.body_as::<String>();
382
383 assert!(result.is_err());
384 }
385}