1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13pub const ORIGINAL_MESSAGE_EXTENSION: &str = "CamelOriginalMessage";
16
17pub const CAMEL_STOP: &str = "CamelStop";
22
23pub fn is_camel_stop(exchange: &Exchange) -> bool {
29 exchange
30 .property(CAMEL_STOP)
31 .and_then(|v| v.as_bool())
32 .unwrap_or(false)
33}
34
35pub const PROPERTY_EXCEPTION_MESSAGE: &str = "CamelExceptionMessage";
37pub const PROPERTY_EXCEPTION_KIND: &str = "CamelExceptionKind";
39pub const PROPERTY_EXCEPTION_CAUGHT: &str = "CamelExceptionCaught";
41pub const PROPERTY_EXCEPTION_HANDLED: &str = "CamelExceptionHandled";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum ExchangePattern {
47 #[default]
49 InOnly,
50 InOut,
52}
53
54#[derive(Debug)]
59pub struct Exchange {
60 pub input: Message,
62 pub output: Option<Message>,
64 pub properties: HashMap<String, Value>,
66 pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
69 pub error: Option<CamelError>,
71 pub pattern: ExchangePattern,
73 pub correlation_id: String,
75 pub otel_context: Context,
79}
80
81impl Exchange {
82 pub fn new(input: Message) -> Self {
84 Self {
85 input,
86 output: None,
87 properties: HashMap::new(),
88 extensions: HashMap::new(),
89 error: None,
90 pattern: ExchangePattern::default(),
91 correlation_id: Uuid::new_v4().to_string(),
92 otel_context: Context::new(),
93 }
94 }
95
96 pub fn new_in_out(input: Message) -> Self {
98 Self {
99 input,
100 output: None,
101 properties: HashMap::new(),
102 extensions: HashMap::new(),
103 error: None,
104 pattern: ExchangePattern::InOut,
105 correlation_id: Uuid::new_v4().to_string(),
106 otel_context: Context::new(),
107 }
108 }
109
110 pub fn correlation_id(&self) -> &str {
112 &self.correlation_id
113 }
114
115 pub fn property(&self, key: &str) -> Option<&Value> {
117 self.properties.get(key)
118 }
119
120 pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
122 self.properties.insert(key.into(), value.into());
123 }
124
125 pub fn has_error(&self) -> bool {
127 self.error.is_some()
128 }
129
130 pub fn set_error(&mut self, error: CamelError) {
138 let msg = error.to_string();
139 let kind = error.classify().to_string();
140 self.properties.insert(
141 PROPERTY_EXCEPTION_MESSAGE.to_string(),
142 Value::String(msg.clone()),
143 );
144 self.properties
145 .insert(PROPERTY_EXCEPTION_KIND.to_string(), Value::String(kind));
146 self.properties
147 .insert(PROPERTY_EXCEPTION_CAUGHT.to_string(), Value::String(msg));
148 self.error = Some(error);
149 }
150
151 pub fn clear_error(&mut self) {
156 self.error = None;
157 self.properties.remove(PROPERTY_EXCEPTION_MESSAGE);
158 self.properties.remove(PROPERTY_EXCEPTION_KIND);
159 self.properties.remove(PROPERTY_EXCEPTION_CAUGHT);
160 }
161
162 pub fn handle_error(&mut self) {
167 self.properties
168 .insert(PROPERTY_EXCEPTION_HANDLED.to_string(), Value::Bool(true));
169 self.clear_error();
170 }
171
172 pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
174 self.extensions.insert(key.into(), value);
175 }
176
177 pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
180 self.extensions.get(key)?.downcast_ref::<T>()
181 }
182
183 pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
195 T::from_body(&self.input.body)
196 }
197}
198
199impl Clone for Exchange {
200 fn clone(&self) -> Self {
201 Self {
202 input: self.input.clone(),
203 output: self.output.clone(),
204 properties: self.properties.clone(),
205 extensions: self.extensions.clone(), error: self.error.clone(),
207 pattern: self.pattern,
208 correlation_id: self.correlation_id.clone(),
209 otel_context: self.otel_context.clone(),
210 }
211 }
212}
213
214impl Default for Exchange {
215 fn default() -> Self {
216 Self::new(Message::default())
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::Body;
224 use serde_json::json;
225
226 #[test]
227 fn test_exchange_new() {
228 let msg = Message::new("test");
229 let ex = Exchange::new(msg);
230 assert_eq!(ex.input.body.as_text(), Some("test"));
231 assert!(ex.output.is_none());
232 assert!(!ex.has_error());
233 assert_eq!(ex.pattern, ExchangePattern::InOnly);
234 }
235
236 #[test]
237 fn test_exchange_in_out() {
238 let ex = Exchange::new_in_out(Message::default());
239 assert_eq!(ex.pattern, ExchangePattern::InOut);
240 }
241
242 #[test]
243 fn test_exchange_properties() {
244 let mut ex = Exchange::default();
245 ex.set_property("key", Value::Bool(true));
246 assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
247 assert_eq!(ex.property("missing"), None);
248 }
249
250 #[test]
251 fn test_exchange_error() {
252 let mut ex = Exchange::default();
253 assert!(!ex.has_error());
254 ex.set_error(CamelError::ProcessorError("test".into()));
255 assert!(ex.has_error());
256 }
257
258 #[test]
259 fn test_set_error_populates_properties() {
260 let mut ex = Exchange::default();
261 ex.set_error(CamelError::ProcessorError("boom".into()));
262
263 assert!(ex.has_error());
264 assert_eq!(
265 ex.properties.get(PROPERTY_EXCEPTION_MESSAGE),
266 Some(&Value::String("Processor error: boom".to_string()))
267 );
268 assert_eq!(
269 ex.properties.get(PROPERTY_EXCEPTION_KIND),
270 Some(&Value::String("processor".to_string()))
271 );
272 assert_eq!(
273 ex.properties.get(PROPERTY_EXCEPTION_CAUGHT),
274 Some(&Value::String("Processor error: boom".to_string()))
275 );
276 }
277
278 #[test]
279 fn test_clear_error_removes_properties() {
280 let mut ex = Exchange::default();
281 ex.set_error(CamelError::RouteError("fail".into()));
282 assert!(ex.has_error());
283 assert!(ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
284
285 ex.clear_error();
286
287 assert!(!ex.has_error());
288 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_MESSAGE));
289 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_KIND));
290 assert!(!ex.properties.contains_key(PROPERTY_EXCEPTION_CAUGHT));
291 }
292
293 #[test]
294 fn test_exchange_lifecycle() {
295 let mut ex = Exchange::new(Message::new("input data"));
296 assert_eq!(ex.input.body.as_text(), Some("input data"));
297
298 ex.set_property("processed", Value::Bool(true));
300
301 ex.output = Some(Message::new("output data"));
303 assert!(ex.output.is_some());
304
305 assert!(!ex.has_error());
307 }
308
309 #[test]
310 fn test_exchange_otel_context_default() {
311 let ex = Exchange::default();
312 use opentelemetry::trace::TraceContextExt;
315 assert!(!ex.otel_context.span().span_context().is_valid());
316 }
317
318 #[test]
319 fn test_exchange_otel_context_propagates_in_clone() {
320 let ex = Exchange::default();
321 let cloned = ex.clone();
322 use opentelemetry::trace::TraceContextExt;
324 assert!(!cloned.otel_context.span().span_context().is_valid());
325 }
326
327 #[test]
328 fn test_set_and_get_extension() {
329 use std::sync::Arc;
330 let mut ex = Exchange::default();
331 ex.set_extension("my.key", Arc::new(42u32));
332 let val: Option<&u32> = ex.get_extension("my.key");
333 assert_eq!(val, Some(&42u32));
334 }
335
336 #[test]
337 fn test_get_extension_wrong_type_returns_none() {
338 use std::sync::Arc;
339 let mut ex = Exchange::default();
340 ex.set_extension("my.key", Arc::new(42u32));
341 let val: Option<&String> = ex.get_extension("my.key");
342 assert!(val.is_none());
343 }
344
345 #[test]
346 fn test_get_extension_missing_key_returns_none() {
347 let ex = Exchange::default();
348 let val: Option<&u32> = ex.get_extension("nope");
349 assert!(val.is_none());
350 }
351
352 #[test]
353 fn test_clone_shares_extension_arc() {
354 use std::sync::Arc;
355 let mut ex = Exchange::default();
356 ex.set_extension("shared", Arc::new(99u64));
357 let cloned = ex.clone();
358 assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
360 assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
361 }
362
363 #[test]
364 fn test_body_as_string_from_text() {
365 let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
366
367 let result = ex.body_as::<String>();
368
369 assert_eq!(result.unwrap(), "hello");
370 }
371
372 #[test]
373 fn test_body_as_string_from_json_string() {
374 let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
375
376 let result = ex.body_as::<String>();
377
378 assert_eq!(result.unwrap(), "hello");
379 }
380
381 #[test]
382 fn test_body_as_json_value_from_json_number() {
383 let ex = Exchange::new(Message::new(Body::Json(json!(42))));
384
385 let result = ex.body_as::<serde_json::Value>();
386
387 assert_eq!(result.unwrap(), json!(42));
388 }
389
390 #[test]
391 fn test_body_as_vec_u8_from_bytes() {
392 let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
393
394 let result = ex.body_as::<Vec<u8>>();
395
396 assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
397 }
398
399 #[test]
400 fn test_body_as_string_from_empty_returns_err() {
401 let ex = Exchange::new(Message::new(Body::Empty));
402
403 let result = ex.body_as::<String>();
404
405 assert!(result.is_err());
406 }
407}