use super::*;
use camel_api::{BoxProcessorExt, Message};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tower::ServiceExt;
fn make_exchange(body: &str) -> Exchange {
Exchange::new(Message::new(body))
}
fn uppercase_processor() -> BoxProcessor {
BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
if let Body::Text(s) = &ex.input.body {
ex.input.body = Body::Text(s.to_uppercase());
}
Ok(ex)
})
})
}
fn failing_processor() -> BoxProcessor {
BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::ProcessorError("boom".into())) }))
}
#[test]
fn test_multicast_zero_parallel_limit_rejected() {
let config = MulticastConfig::new().parallel(true).parallel_limit(0);
let result = MulticastService::new(vec![passthrough_processor()], config);
assert!(result.is_err(), "zero parallel_limit should return Err");
}
fn passthrough_processor() -> BoxProcessor {
BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
}
#[tokio::test]
async fn test_multicast_sequential_last_wins() {
let endpoints = vec![
uppercase_processor(),
uppercase_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await
.unwrap();
assert_eq!(result.input.body.as_text(), Some("HELLO"));
}
#[tokio::test]
async fn test_multicast_sequential_collect_all() {
let endpoints = vec![
uppercase_processor(),
uppercase_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await
.unwrap();
let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
match &result.input.body {
Body::Json(v) => assert_eq!(*v, expected),
other => panic!("expected JSON body, got {other:?}"),
}
}
#[tokio::test]
async fn test_multicast_sequential_original() {
let endpoints = vec![
uppercase_processor(),
uppercase_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await
.unwrap();
assert_eq!(result.input.body.as_text(), Some("hello"));
}
#[tokio::test]
async fn test_multicast_sequential_custom_aggregation() {
let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
Arc::new(|mut acc: Exchange, next: Exchange| {
let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
let next_text = next.input.body.as_text().unwrap_or("").to_string();
acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
acc
});
let endpoints = vec![
uppercase_processor(),
uppercase_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("a"))
.await
.unwrap();
assert_eq!(result.input.body.as_text(), Some("A+A+A"));
}
#[tokio::test]
async fn test_multicast_stop_on_exception() {
let endpoints = vec![
uppercase_processor(),
failing_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new().stop_on_exception(true);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await;
assert!(result.is_err(), "expected error due to stop_on_exception");
}
#[tokio::test]
async fn test_multicast_continue_on_exception() {
let endpoints = vec![
uppercase_processor(),
failing_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::LastWins);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await;
assert!(result.is_ok(), "last endpoint should succeed");
assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
}
#[tokio::test]
async fn test_multicast_stop_on_exception_halts_early() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
let executed = Arc::new(AtomicUsize::new(0));
let exec_clone1 = Arc::clone(&executed);
let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
let e = Arc::clone(&exec_clone1);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Ok(ex)
})
});
let exec_clone2 = Arc::clone(&executed);
let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
let e = Arc::clone(&exec_clone2);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Err(CamelError::ProcessorError("fail on 1".into()))
})
});
let exec_clone3 = Arc::clone(&executed);
let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
let e = Arc::clone(&exec_clone3);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Ok(ex)
})
});
let endpoints = vec![endpoint0, endpoint1, endpoint2];
let config = MulticastConfig::new().stop_on_exception(true);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
assert!(result.is_err(), "should fail at endpoint 1");
let count = executed.load(AtomicOrdering::SeqCst);
assert_eq!(
count, 2,
"endpoint 2 should not have executed due to stop_on_exception"
);
}
#[tokio::test]
async fn test_multicast_continue_on_exception_executes_all() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
let executed = Arc::new(AtomicUsize::new(0));
let exec_clone1 = Arc::clone(&executed);
let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
let e = Arc::clone(&exec_clone1);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Ok(ex)
})
});
let exec_clone2 = Arc::clone(&executed);
let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
let e = Arc::clone(&exec_clone2);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Err(CamelError::ProcessorError("fail on 1".into()))
})
});
let exec_clone3 = Arc::clone(&executed);
let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
let e = Arc::clone(&exec_clone3);
Box::pin(async move {
e.fetch_add(1, AtomicOrdering::SeqCst);
Ok(ex)
})
});
let endpoints = vec![endpoint0, endpoint1, endpoint2];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::LastWins);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
assert!(result.is_ok(), "last endpoint should succeed");
let count = executed.load(AtomicOrdering::SeqCst);
assert_eq!(
count, 3,
"all endpoints should have executed despite error in endpoint 1"
);
}
#[tokio::test]
async fn test_multicast_empty_endpoints() {
let endpoints: Vec<BoxProcessor> = vec![];
let config = MulticastConfig::new();
let mut svc = MulticastService::new(endpoints, config).unwrap();
let mut ex = make_exchange("hello");
ex.set_property("marker", Value::Bool(true));
let result = svc.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("hello"));
assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
}
#[tokio::test]
async fn test_multicast_metadata_properties() {
let recorder = BoxProcessor::from_fn(|ex: Exchange| {
Box::pin(async move {
let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
let body = serde_json::json!({
"index": idx,
"complete": complete,
});
let mut out = ex;
out.input.body = Body::Json(body);
Ok(out)
})
});
let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("x"))
.await
.unwrap();
let expected = serde_json::json!([
{"index": 0, "complete": false},
{"index": 1, "complete": false},
{"index": 2, "complete": true},
]);
match &result.input.body {
Body::Json(v) => assert_eq!(*v, expected),
other => panic!("expected JSON body, got {other:?}"),
}
}
#[tokio::test]
async fn test_poll_ready_returns_ready_immediately() {
use std::sync::atomic::AtomicBool;
#[derive(Clone)]
struct NeverReady {
_ready: Arc<AtomicBool>,
}
impl Service<Exchange> for NeverReady {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
cx.waker().wake_by_ref();
Poll::Pending
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
Box::pin(async move { Ok(exchange) })
}
}
let inner = NeverReady {
_ready: Arc::new(AtomicBool::new(false)),
};
let boxed: BoxProcessor = BoxProcessor::new(inner);
let config = MulticastConfig::new();
let mut svc = MulticastService::new(vec![boxed], config).unwrap();
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut svc).poll_ready(&mut cx);
assert!(
matches!(poll, Poll::Ready(Ok(()))),
"expected Ready(Ok(())) even when endpoint is not ready"
);
}
#[tokio::test]
async fn test_multicast_collect_all_error_propagates() {
let endpoints = vec![
uppercase_processor(),
failing_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await;
assert!(result.is_err(), "CollectAll should propagate first error");
}
#[tokio::test]
async fn test_multicast_last_wins_error_last() {
let endpoints = vec![
uppercase_processor(),
uppercase_processor(),
failing_processor(),
];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::LastWins);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await;
assert!(result.is_err(), "LastWins should return last error");
}
#[tokio::test]
async fn test_multicast_custom_error_propagates() {
let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
Arc::new(|acc: Exchange, _next: Exchange| acc);
let endpoints = vec![
uppercase_processor(),
failing_processor(),
uppercase_processor(),
];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::Custom(joiner));
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("hello"))
.await;
assert!(
result.is_err(),
"Custom aggregation should propagate errors"
);
}
#[tokio::test]
async fn test_multicast_parallel_basic() {
let endpoints = vec![uppercase_processor(), uppercase_processor()];
let config = MulticastConfig::new()
.parallel(true)
.aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("test"))
.await
.unwrap();
match &result.input.body {
Body::Json(v) => {
let arr = v.as_array().expect("expected array");
assert_eq!(arr.len(), 2);
assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
}
other => panic!("expected JSON body, got {:?}", other),
}
}
#[tokio::test]
async fn test_multicast_parallel_with_limit() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
let concurrent = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let endpoints: Vec<BoxProcessor> = (0..4)
.map(|_| {
let c = Arc::clone(&concurrent);
let mc = Arc::clone(&max_concurrent);
BoxProcessor::from_fn(move |ex: Exchange| {
let c = Arc::clone(&c);
let mc = Arc::clone(&mc);
Box::pin(async move {
let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
mc.fetch_max(current, AtomicOrdering::SeqCst);
tokio::task::yield_now().await;
c.fetch_sub(1, AtomicOrdering::SeqCst);
Ok(ex)
})
})
})
.collect();
let config = MulticastConfig::new().parallel(true).parallel_limit(2);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
assert!(
observed_max <= 2,
"max concurrency was {}, expected <= 2",
observed_max
);
}
async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
use bytes::Bytes;
use camel_api::{Body, StreamBody, StreamMetadata};
use futures::stream;
use std::sync::Arc;
use tokio::sync::Mutex;
let chunks = vec![Ok(Bytes::from("test"))];
let stream_body = StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
metadata: StreamMetadata {
origin,
..Default::default()
},
};
let stream_body_clone = stream_body.clone();
let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
let body_clone = stream_body_clone.clone();
Box::pin(async move {
let mut out = ex;
out.input.body = Body::Stream(body_clone);
Ok(out)
})
})];
let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
svc.ready()
.await
.unwrap()
.call(Exchange::new(Message::new("")))
.await
.unwrap()
}
#[tokio::test]
async fn test_multicast_stream_bodies_creates_valid_json() {
use camel_api::Body;
let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
let Body::Json(value) = &result.input.body else {
panic!("Expected Json body, got {:?}", result.input.body);
};
let json_str = serde_json::to_string(&value).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.is_array());
let arr = parsed.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(arr[0]["_stream"].is_object());
assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
assert_eq!(arr[0]["_stream"]["placeholder"], true);
}
#[tokio::test]
async fn test_multicast_stream_with_none_origin_creates_valid_json() {
use camel_api::Body;
let result = setup_multicast_stream_test(None).await;
let Body::Json(value) = &result.input.body else {
panic!("Expected Json body, got {:?}", result.input.body);
};
let json_str = serde_json::to_string(&value).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.is_array());
let arr = parsed.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(arr[0]["_stream"].is_object());
assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
assert_eq!(arr[0]["_stream"]["placeholder"], true);
}
#[tokio::test]
async fn test_poll_ready_error_does_not_poison_multicast() {
use std::sync::atomic::AtomicUsize;
#[derive(Clone)]
struct FailingReadyService {
call_count: Arc<AtomicUsize>,
}
impl Service<Exchange> for FailingReadyService {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(CamelError::ProcessorError("ready-fail".into())))
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
self.call_count.fetch_add(1, Ordering::SeqCst);
Box::pin(async move { Ok(exchange) })
}
}
let call_count = Arc::new(AtomicUsize::new(0));
let failing_svc = FailingReadyService {
call_count: Arc::clone(&call_count),
};
let failing_boxed: BoxProcessor = BoxProcessor::new(failing_svc);
let endpoints = vec![uppercase_processor(), failing_boxed, uppercase_processor()];
let config = MulticastConfig::new()
.stop_on_exception(false)
.aggregation(MulticastStrategy::LastWins);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc.ready().await;
assert!(
result.is_ok(),
"poll_ready should not fail-fast; got error: {:?}",
result.err()
);
let result = result.unwrap().call(make_exchange("hello")).await;
assert!(
result.is_ok(),
"LastWins with last endpoint succeeding should return Ok; got: {:?}",
result.err()
);
assert_eq!(
result.unwrap().input.body.as_text(),
Some("HELLO"),
"last successful endpoint should have uppercased"
);
assert_eq!(call_count.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
let endpoints = vec![
BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
ex.input.body = Body::Bytes(vec![65, 66].into());
Ok(ex)
})
}),
BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
ex.input.body = Body::Xml("<a/>".to_string());
Ok(ex)
})
}),
BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
ex.input.body = Body::Empty;
Ok(ex)
})
}),
];
let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
let mut svc = MulticastService::new(endpoints, config).unwrap();
let result = svc
.ready()
.await
.unwrap()
.call(make_exchange("x"))
.await
.unwrap();
match result.input.body {
Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
_ => panic!("expected json"),
}
}