use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use opentelemetry::Context;
use uuid::Uuid;
use crate::error::CamelError;
use crate::from_body::FromBody;
use crate::message::Message;
use crate::value::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ExchangePattern {
#[default]
InOnly,
InOut,
}
#[derive(Debug)]
pub struct Exchange {
pub input: Message,
pub output: Option<Message>,
pub properties: HashMap<String, Value>,
pub extensions: HashMap<String, Arc<dyn Any + Send + Sync>>,
pub error: Option<CamelError>,
pub pattern: ExchangePattern,
pub correlation_id: String,
pub otel_context: Context,
}
impl Exchange {
pub fn new(input: Message) -> Self {
Self {
input,
output: None,
properties: HashMap::new(),
extensions: HashMap::new(),
error: None,
pattern: ExchangePattern::default(),
correlation_id: Uuid::new_v4().to_string(),
otel_context: Context::new(),
}
}
pub fn new_in_out(input: Message) -> Self {
Self {
input,
output: None,
properties: HashMap::new(),
extensions: HashMap::new(),
error: None,
pattern: ExchangePattern::InOut,
correlation_id: Uuid::new_v4().to_string(),
otel_context: Context::new(),
}
}
pub fn correlation_id(&self) -> &str {
&self.correlation_id
}
pub fn property(&self, key: &str) -> Option<&Value> {
self.properties.get(key)
}
pub fn set_property(&mut self, key: impl Into<String>, value: impl Into<Value>) {
self.properties.insert(key.into(), value.into());
}
pub fn has_error(&self) -> bool {
self.error.is_some()
}
pub fn set_error(&mut self, error: CamelError) {
self.error = Some(error);
}
pub fn set_extension(&mut self, key: impl Into<String>, value: Arc<dyn Any + Send + Sync>) {
self.extensions.insert(key.into(), value);
}
pub fn get_extension<T: Any>(&self, key: &str) -> Option<&T> {
self.extensions.get(key)?.downcast_ref::<T>()
}
pub fn body_as<T: FromBody>(&self) -> Result<T, CamelError> {
T::from_body(&self.input.body)
}
}
impl Clone for Exchange {
fn clone(&self) -> Self {
Self {
input: self.input.clone(),
output: self.output.clone(),
properties: self.properties.clone(),
extensions: self.extensions.clone(), error: self.error.clone(),
pattern: self.pattern,
correlation_id: self.correlation_id.clone(),
otel_context: self.otel_context.clone(),
}
}
}
impl Default for Exchange {
fn default() -> Self {
Self::new(Message::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Body;
use serde_json::json;
#[test]
fn test_exchange_new() {
let msg = Message::new("test");
let ex = Exchange::new(msg);
assert_eq!(ex.input.body.as_text(), Some("test"));
assert!(ex.output.is_none());
assert!(!ex.has_error());
assert_eq!(ex.pattern, ExchangePattern::InOnly);
}
#[test]
fn test_exchange_in_out() {
let ex = Exchange::new_in_out(Message::default());
assert_eq!(ex.pattern, ExchangePattern::InOut);
}
#[test]
fn test_exchange_properties() {
let mut ex = Exchange::default();
ex.set_property("key", Value::Bool(true));
assert_eq!(ex.property("key"), Some(&Value::Bool(true)));
assert_eq!(ex.property("missing"), None);
}
#[test]
fn test_exchange_error() {
let mut ex = Exchange::default();
assert!(!ex.has_error());
ex.set_error(CamelError::ProcessorError("test".into()));
assert!(ex.has_error());
}
#[test]
fn test_exchange_lifecycle() {
let mut ex = Exchange::new(Message::new("input data"));
assert_eq!(ex.input.body.as_text(), Some("input data"));
ex.set_property("processed", Value::Bool(true));
ex.output = Some(Message::new("output data"));
assert!(ex.output.is_some());
assert!(!ex.has_error());
}
#[test]
fn test_exchange_otel_context_default() {
let ex = Exchange::default();
use opentelemetry::trace::TraceContextExt;
assert!(!ex.otel_context.span().span_context().is_valid());
}
#[test]
fn test_exchange_otel_context_propagates_in_clone() {
let ex = Exchange::default();
let cloned = ex.clone();
use opentelemetry::trace::TraceContextExt;
assert!(!cloned.otel_context.span().span_context().is_valid());
}
#[test]
fn test_set_and_get_extension() {
use std::sync::Arc;
let mut ex = Exchange::default();
ex.set_extension("my.key", Arc::new(42u32));
let val: Option<&u32> = ex.get_extension("my.key");
assert_eq!(val, Some(&42u32));
}
#[test]
fn test_get_extension_wrong_type_returns_none() {
use std::sync::Arc;
let mut ex = Exchange::default();
ex.set_extension("my.key", Arc::new(42u32));
let val: Option<&String> = ex.get_extension("my.key");
assert!(val.is_none());
}
#[test]
fn test_get_extension_missing_key_returns_none() {
let ex = Exchange::default();
let val: Option<&u32> = ex.get_extension("nope");
assert!(val.is_none());
}
#[test]
fn test_clone_shares_extension_arc() {
use std::sync::Arc;
let mut ex = Exchange::default();
ex.set_extension("shared", Arc::new(99u64));
let cloned = ex.clone();
assert_eq!(ex.get_extension::<u64>("shared"), Some(&99u64));
assert_eq!(cloned.get_extension::<u64>("shared"), Some(&99u64));
}
#[test]
fn test_body_as_string_from_text() {
let ex = Exchange::new(Message::new(Body::Text("hello".to_string())));
let result = ex.body_as::<String>();
assert_eq!(result.unwrap(), "hello");
}
#[test]
fn test_body_as_string_from_json_string() {
let ex = Exchange::new(Message::new(Body::Json(json!("hello"))));
let result = ex.body_as::<String>();
assert_eq!(result.unwrap(), "hello");
}
#[test]
fn test_body_as_json_value_from_json_number() {
let ex = Exchange::new(Message::new(Body::Json(json!(42))));
let result = ex.body_as::<serde_json::Value>();
assert_eq!(result.unwrap(), json!(42));
}
#[test]
fn test_body_as_vec_u8_from_bytes() {
let ex = Exchange::new(Message::new(Body::from(vec![1u8, 2, 3, 4])));
let result = ex.body_as::<Vec<u8>>();
assert_eq!(result.unwrap(), vec![1u8, 2, 3, 4]);
}
#[test]
fn test_body_as_string_from_empty_returns_err() {
let ex = Exchange::new(Message::new(Body::Empty));
let result = ex.body_as::<String>();
assert!(result.is_err());
}
}