Skip to main content

camel_api/
exchange.rs

1use std::collections::HashMap;
2
3use opentelemetry::Context;
4use uuid::Uuid;
5
6use crate::error::CamelError;
7use crate::message::Message;
8use crate::value::Value;
9
10/// The exchange pattern (fire-and-forget or request-reply).
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum ExchangePattern {
13    /// Fire-and-forget: message sent, no reply expected.
14    #[default]
15    InOnly,
16    /// Request-reply: message sent, reply expected.
17    InOut,
18}
19
20/// An Exchange represents a message being routed through the framework.
21///
22/// It contains the input message, an optional output message,
23/// properties for passing data between processors, and error state.
24#[derive(Debug, Clone)]
25pub struct Exchange {
26    /// The input (incoming) message.
27    pub input: Message,
28    /// The output (response) message, populated for InOut patterns.
29    pub output: Option<Message>,
30    /// Exchange-scoped properties for passing data between processors.
31    pub properties: HashMap<String, Value>,
32    /// Error state, if processing failed.
33    pub error: Option<CamelError>,
34    /// The exchange pattern.
35    pub pattern: ExchangePattern,
36    /// Unique correlation ID for distributed tracing.
37    pub correlation_id: String,
38    /// OpenTelemetry context for distributed tracing propagation.
39    /// Carries the active span context between processing steps.
40    /// Defaults to an empty context (noop span) if OTel is not active.
41    pub otel_context: Context,
42}
43
44impl Exchange {
45    /// Create a new exchange with the given input message.
46    pub fn new(input: Message) -> Self {
47        Self {
48            input,
49            output: None,
50            properties: HashMap::new(),
51            error: None,
52            pattern: ExchangePattern::default(),
53            correlation_id: Uuid::new_v4().to_string(),
54            otel_context: Context::new(),
55        }
56    }
57
58    /// Create a new exchange with the InOut pattern.
59    pub fn new_in_out(input: Message) -> Self {
60        Self {
61            input,
62            output: None,
63            properties: HashMap::new(),
64            error: None,
65            pattern: ExchangePattern::InOut,
66            correlation_id: Uuid::new_v4().to_string(),
67            otel_context: Context::new(),
68        }
69    }
70
71    /// Get the correlation ID for this exchange.
72    pub fn correlation_id(&self) -> &str {
73        &self.correlation_id
74    }
75
76    /// Get a property value.
77    pub fn property(&self, key: &str) -> Option<&Value> {
78        self.properties.get(key)
79    }
80
81    /// Set a property value.
82    pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
83        self.properties.insert(key.into(), value.into());
84    }
85
86    /// Check if the exchange has an error.
87    pub fn has_error(&self) -> bool {
88        self.error.is_some()
89    }
90
91    /// Set an error on this exchange.
92    pub fn set_error(&mut self, error: CamelError) {
93        self.error = Some(error);
94    }
95}
96
97impl Default for Exchange {
98    fn default() -> Self {
99        Self::new(Message::default())
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn test_exchange_new() {
109        let msg = Message::new("test");
110        let ex = Exchange::new(msg);
111        assert_eq!(ex.input.body.as_text(), Some("test"));
112        assert!(ex.output.is_none());
113        assert!(!ex.has_error());
114        assert_eq!(ex.pattern, ExchangePattern::InOnly);
115    }
116
117    #[test]
118    fn test_exchange_in_out() {
119        let ex = Exchange::new_in_out(Message::default());
120        assert_eq!(ex.pattern, ExchangePattern::InOut);
121    }
122
123    #[test]
124    fn test_exchange_properties() {
125        let mut ex = Exchange::default();
126        ex.set_property("key", Value::Bool(true));
127        assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
128        assert_eq!(ex.property("missing"), None);
129    }
130
131    #[test]
132    fn test_exchange_error() {
133        let mut ex = Exchange::default();
134        assert!(!ex.has_error());
135        ex.set_error(CamelError::ProcessorError("test".into()));
136        assert!(ex.has_error());
137    }
138
139    #[test]
140    fn test_exchange_lifecycle() {
141        let mut ex = Exchange::new(Message::new("input data"));
142        assert_eq!(ex.input.body.as_text(), Some("input data"));
143
144        // Set some properties
145        ex.set_property("processed", Value::Bool(true));
146
147        // Set output
148        ex.output = Some(Message::new("output data"));
149        assert!(ex.output.is_some());
150
151        // Verify no error
152        assert!(!ex.has_error());
153    }
154
155    #[test]
156    fn test_exchange_otel_context_default() {
157        let ex = Exchange::default();
158        // Field must exist and be accessible — compilation is the test
159        // Also verify it's a fresh context (noop span)
160        use opentelemetry::trace::TraceContextExt;
161        assert!(!ex.otel_context.span().span_context().is_valid());
162    }
163
164    #[test]
165    fn test_exchange_otel_context_propagates_in_clone() {
166        let ex = Exchange::default();
167        let cloned = ex.clone();
168        // Both should have the same (empty) context
169        use opentelemetry::trace::TraceContextExt;
170        assert!(!cloned.otel_context.span().span_context().is_valid());
171    }
172}