1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use opentelemetry::Context;
6use uuid::Uuid;
7
8use crate::error::CamelError;
9use crate::message::Message;
10use crate::value::Value;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum ExchangePattern {
15 #[default]
17 InOnly,
18 InOut,
20}
21
22#[derive(Debug)]
27pub struct Exchange {
28 pub input: Message,
30 pub output: Option<Message>,
32 pub properties: HashMap<String, Value>,
34 pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
37 pub error: Option<CamelError>,
39 pub pattern: ExchangePattern,
41 pub correlation_id: String,
43 pub otel_context: Context,
47}
48
49impl Exchange {
50 pub fn new(input: Message) -> Self {
52 Self {
53 input,
54 output: None,
55 properties: HashMap::new(),
56 extensions: HashMap::new(),
57 error: None,
58 pattern: ExchangePattern::default(),
59 correlation_id: Uuid::new_v4().to_string(),
60 otel_context: Context::new(),
61 }
62 }
63
64 pub fn new_in_out(input: Message) -> Self {
66 Self {
67 input,
68 output: None,
69 properties: HashMap::new(),
70 extensions: HashMap::new(),
71 error: None,
72 pattern: ExchangePattern::InOut,
73 correlation_id: Uuid::new_v4().to_string(),
74 otel_context: Context::new(),
75 }
76 }
77
78 pub fn correlation_id(&self) -> &str {
80 &self.correlation_id
81 }
82
83 pub fn property(&self, key: &str) -> Option<&Value> {
85 self.properties.get(key)
86 }
87
88 pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
90 self.properties.insert(key.into(), value.into());
91 }
92
93 pub fn has_error(&self) -> bool {
95 self.error.is_some()
96 }
97
98 pub fn set_error(&mut self, error: CamelError) {
100 self.error = Some(error);
101 }
102
103 pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
105 self.extensions.insert(key.into(), value);
106 }
107
108 pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
111 self.extensions.get(key)?.downcast_ref::<T>()
112 }
113}
114
115impl Clone for Exchange {
116 fn clone(&self) -> Self {
117 Self {
118 input: self.input.clone(),
119 output: self.output.clone(),
120 properties: self.properties.clone(),
121 extensions: self.extensions.clone(), error: self.error.clone(),
123 pattern: self.pattern,
124 correlation_id: self.correlation_id.clone(),
125 otel_context: self.otel_context.clone(),
126 }
127 }
128}
129
130impl Default for Exchange {
131 fn default() -> Self {
132 Self::new(Message::default())
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_exchange_new() {
142 let msg = Message::new("test");
143 let ex = Exchange::new(msg);
144 assert_eq!(ex.input.body.as_text(), Some("test"));
145 assert!(ex.output.is_none());
146 assert!(!ex.has_error());
147 assert_eq!(ex.pattern, ExchangePattern::InOnly);
148 }
149
150 #[test]
151 fn test_exchange_in_out() {
152 let ex = Exchange::new_in_out(Message::default());
153 assert_eq!(ex.pattern, ExchangePattern::InOut);
154 }
155
156 #[test]
157 fn test_exchange_properties() {
158 let mut ex = Exchange::default();
159 ex.set_property("key", Value::Bool(true));
160 assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
161 assert_eq!(ex.property("missing"), None);
162 }
163
164 #[test]
165 fn test_exchange_error() {
166 let mut ex = Exchange::default();
167 assert!(!ex.has_error());
168 ex.set_error(CamelError::ProcessorError("test".into()));
169 assert!(ex.has_error());
170 }
171
172 #[test]
173 fn test_exchange_lifecycle() {
174 let mut ex = Exchange::new(Message::new("input data"));
175 assert_eq!(ex.input.body.as_text(), Some("input data"));
176
177 ex.set_property("processed", Value::Bool(true));
179
180 ex.output = Some(Message::new("output data"));
182 assert!(ex.output.is_some());
183
184 assert!(!ex.has_error());
186 }
187
188 #[test]
189 fn test_exchange_otel_context_default() {
190 let ex = Exchange::default();
191 use opentelemetry::trace::TraceContextExt;
194 assert!(!ex.otel_context.span().span_context().is_valid());
195 }
196
197 #[test]
198 fn test_exchange_otel_context_propagates_in_clone() {
199 let ex = Exchange::default();
200 let cloned = ex.clone();
201 use opentelemetry::trace::TraceContextExt;
203 assert!(!cloned.otel_context.span().span_context().is_valid());
204 }
205
206 #[test]
207 fn test_set_and_get_extension() {
208 use std::sync::Arc;
209 let mut ex = Exchange::default();
210 ex.set_extension("my.key", Arc::new(42u32));
211 let val: Option<&u32> = ex.get_extension("my.key");
212 assert_eq!(val, Some(&42u32));
213 }
214
215 #[test]
216 fn test_get_extension_wrong_type_returns_none() {
217 use std::sync::Arc;
218 let mut ex = Exchange::default();
219 ex.set_extension("my.key", Arc::new(42u32));
220 let val: Option<&String> = ex.get_extension("my.key");
221 assert!(val.is_none());
222 }
223
224 #[test]
225 fn test_get_extension_missing_key_returns_none() {
226 let ex = Exchange::default();
227 let val: Option<&u32> = ex.get_extension("nope");
228 assert!(val.is_none());
229 }
230
231 #[test]
232 fn test_clone_shares_extension_arc() {
233 use std::sync::Arc;
234 let mut ex = Exchange::default();
235 ex.set_extension("shared", Arc::new(99u64));
236 let cloned = ex.clone();
237 assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
239 assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
240 }
241}