1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::from_body::FromBody;
10use crate::message::Message;
11use crate::value::Value;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum ExchangePattern {
16 #[default]
18 InOnly,
19 InOut,
21}
22
23#[derive(Debug)]
28pub struct Exchange {
29 pub input: Message,
31 pub output: Option<Message>,
33 pub properties: HashMap<String, Value>,
35 pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
38 pub error: Option<CamelError>,
40 pub pattern: ExchangePattern,
42 pub correlation_id: String,
44 pub otel_context: Context,
48}
49
50impl Exchange {
51 pub fn new(input: Message) -> Self {
53 Self {
54 input,
55 output: None,
56 properties: HashMap::new(),
57 extensions: HashMap::new(),
58 error: None,
59 pattern: ExchangePattern::default(),
60 correlation_id: Uuid::new_v4().to_string(),
61 otel_context: Context::new(),
62 }
63 }
64
65 pub fn new_in_out(input: Message) -> Self {
67 Self {
68 input,
69 output: None,
70 properties: HashMap::new(),
71 extensions: HashMap::new(),
72 error: None,
73 pattern: ExchangePattern::InOut,
74 correlation_id: Uuid::new_v4().to_string(),
75 otel_context: Context::new(),
76 }
77 }
78
79 pub fn correlation_id(&self) -> &str {
81 &self.correlation_id
82 }
83
84 pub fn property(&self, key: &str) -> Option<&Value> {
86 self.properties.get(key)
87 }
88
89 pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
91 self.properties.insert(key.into(), value.into());
92 }
93
94 pub fn has_error(&self) -> bool {
96 self.error.is_some()
97 }
98
99 pub fn set_error(&mut self, error: CamelError) {
101 self.error = Some(error);
102 }
103
104 pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
106 self.extensions.insert(key.into(), value);
107 }
108
109 pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
112 self.extensions.get(key)?.downcast_ref::<T>()
113 }
114
115 pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
127 T::from_body(&self.input.body)
128 }
129}
130
131impl Clone for Exchange {
132 fn clone(&self) -> Self {
133 Self {
134 input: self.input.clone(),
135 output: self.output.clone(),
136 properties: self.properties.clone(),
137 extensions: self.extensions.clone(), error: self.error.clone(),
139 pattern: self.pattern,
140 correlation_id: self.correlation_id.clone(),
141 otel_context: self.otel_context.clone(),
142 }
143 }
144}
145
146impl Default for Exchange {
147 fn default() -> Self {
148 Self::new(Message::default())
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::Body;
156 use serde_json::json;
157
158 #[test]
159 fn test_exchange_new() {
160 let msg = Message::new("test");
161 let ex = Exchange::new(msg);
162 assert_eq!(ex.input.body.as_text(), Some("test"));
163 assert!(ex.output.is_none());
164 assert!(!ex.has_error());
165 assert_eq!(ex.pattern, ExchangePattern::InOnly);
166 }
167
168 #[test]
169 fn test_exchange_in_out() {
170 let ex = Exchange::new_in_out(Message::default());
171 assert_eq!(ex.pattern, ExchangePattern::InOut);
172 }
173
174 #[test]
175 fn test_exchange_properties() {
176 let mut ex = Exchange::default();
177 ex.set_property("key", Value::Bool(true));
178 assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
179 assert_eq!(ex.property("missing"), None);
180 }
181
182 #[test]
183 fn test_exchange_error() {
184 let mut ex = Exchange::default();
185 assert!(!ex.has_error());
186 ex.set_error(CamelError::ProcessorError("test".into()));
187 assert!(ex.has_error());
188 }
189
190 #[test]
191 fn test_exchange_lifecycle() {
192 let mut ex = Exchange::new(Message::new("input data"));
193 assert_eq!(ex.input.body.as_text(), Some("input data"));
194
195 ex.set_property("processed", Value::Bool(true));
197
198 ex.output = Some(Message::new("output data"));
200 assert!(ex.output.is_some());
201
202 assert!(!ex.has_error());
204 }
205
206 #[test]
207 fn test_exchange_otel_context_default() {
208 let ex = Exchange::default();
209 use opentelemetry::trace::TraceContextExt;
212 assert!(!ex.otel_context.span().span_context().is_valid());
213 }
214
215 #[test]
216 fn test_exchange_otel_context_propagates_in_clone() {
217 let ex = Exchange::default();
218 let cloned = ex.clone();
219 use opentelemetry::trace::TraceContextExt;
221 assert!(!cloned.otel_context.span().span_context().is_valid());
222 }
223
224 #[test]
225 fn test_set_and_get_extension() {
226 use std::sync::Arc;
227 let mut ex = Exchange::default();
228 ex.set_extension("my.key", Arc::new(42u32));
229 let val: Option<&u32> = ex.get_extension("my.key");
230 assert_eq!(val, Some(&42u32));
231 }
232
233 #[test]
234 fn test_get_extension_wrong_type_returns_none() {
235 use std::sync::Arc;
236 let mut ex = Exchange::default();
237 ex.set_extension("my.key", Arc::new(42u32));
238 let val: Option<&String> = ex.get_extension("my.key");
239 assert!(val.is_none());
240 }
241
242 #[test]
243 fn test_get_extension_missing_key_returns_none() {
244 let ex = Exchange::default();
245 let val: Option<&u32> = ex.get_extension("nope");
246 assert!(val.is_none());
247 }
248
249 #[test]
250 fn test_clone_shares_extension_arc() {
251 use std::sync::Arc;
252 let mut ex = Exchange::default();
253 ex.set_extension("shared", Arc::new(99u64));
254 let cloned = ex.clone();
255 assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
257 assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
258 }
259
260 #[test]
261 fn test_body_as_string_from_text() {
262 let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
263
264 let result = ex.body_as::<String>();
265
266 assert_eq!(result.unwrap(), "hello");
267 }
268
269 #[test]
270 fn test_body_as_string_from_json_string() {
271 let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
272
273 let result = ex.body_as::<String>();
274
275 assert_eq!(result.unwrap(), "hello");
276 }
277
278 #[test]
279 fn test_body_as_json_value_from_json_number() {
280 let ex = Exchange::new(Message::new(Body::Json(json!(42))));
281
282 let result = ex.body_as::<serde_json::Value>();
283
284 assert_eq!(result.unwrap(), json!(42));
285 }
286
287 #[test]
288 fn test_body_as_vec_u8_from_bytes() {
289 let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
290
291 let result = ex.body_as::<Vec<u8>>();
292
293 assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
294 }
295
296 #[test]
297 fn test_body_as_string_from_empty_returns_err() {
298 let ex = Exchange::new(Message::new(Body::Empty));
299
300 let result = ex.body_as::<String>();
301
302 assert!(result.is_err());
303 }
304}