1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
//! ## Stop semantics (ADR-0025)
//!
//! This segment implements `OutcomePipeline` and propagates `PipelineOutcome::Stopped(ex)` with the exchange state intact (including mutations made inside the segment body before Stop fired). See ADR-0025 §3 (stopped-exchange-state-preservation invariant).
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
use camel_api::{
Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
};
// ── Metadata property keys ─────────────────────────────────────────────
/// Property key for the zero-based index of the endpoint being invoked.
pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
/// Property key indicating whether this is the last endpoint invocation.
pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
// ── MulticastService ───────────────────────────────────────────────────
/// Tower Service implementing the Multicast EIP.
///
/// Sends a message to multiple endpoints, processing each independently,
/// and then aggregating the results.
///
/// Supports both sequential and parallel processing modes, configurable
/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
/// all endpoints are invoked concurrently with optional concurrency
/// limiting via [`MulticastConfig::parallel_limit`].
#[derive(Clone)]
pub struct MulticastService {
endpoints: Vec<BoxProcessor>,
config: MulticastConfig,
}
impl MulticastService {
/// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Result<Self, CamelError> {
config.validate()?;
Ok(Self { endpoints, config })
}
}
impl Service<Exchange> for MulticastService {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Do NOT aggregate endpoint readiness here.
// Each endpoint's readiness is checked per-endpoint inside
// process_sequential / process_parallel where stop_on_exception
// is respected. Fail-fast here would bypass that logic entirely.
Poll::Ready(Ok(()))
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let original = exchange.clone();
let endpoints = self.endpoints.clone();
let config = self.config.clone();
Box::pin(async move {
// If no endpoints, return original exchange unchanged
if endpoints.is_empty() {
return Ok(original);
}
let total = endpoints.len();
let results = if config.parallel {
// Process endpoints in parallel
process_parallel(exchange, endpoints, config.parallel_limit, total).await
} else {
// Process each endpoint sequentially
process_sequential(exchange, endpoints, config.stop_on_exception, total).await
};
// Aggregate results per strategy
aggregate(results, original, config.aggregation)
})
}
}
// ── Sequential processing ──────────────────────────────────────────────
async fn process_sequential(
exchange: Exchange,
endpoints: Vec<BoxProcessor>,
stop_on_exception: bool,
total: usize,
) -> Vec<Result<Exchange, CamelError>> {
let mut results = Vec::with_capacity(endpoints.len());
for (i, endpoint) in endpoints.into_iter().enumerate() {
// Clone the exchange for each endpoint
let mut cloned_exchange = exchange.clone();
// Set multicast metadata properties
cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
let mut endpoint = endpoint;
match tower::ServiceExt::ready(&mut endpoint).await {
Err(e) => {
results.push(Err(e));
if stop_on_exception {
break;
}
}
Ok(svc) => {
let result = svc.call(cloned_exchange).await;
let is_err = result.is_err();
results.push(result);
if stop_on_exception && is_err {
break;
}
}
}
}
results
}
// ── Parallel processing ────────────────────────────────────────────────
async fn process_parallel(
exchange: Exchange,
endpoints: Vec<BoxProcessor>,
parallel_limit: Option<usize>,
total: usize,
) -> Vec<Result<Exchange, CamelError>> {
use std::sync::Arc;
use tokio::sync::Semaphore;
let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
// Build futures for each endpoint
let futures: Vec<_> = endpoints
.into_iter()
.enumerate()
.map(|(i, mut endpoint)| {
let mut ex = exchange.clone();
ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
let sem = semaphore.clone();
async move {
// Acquire semaphore permit if limit is set
let _permit = match &sem {
Some(s) => match s.acquire().await {
Ok(p) => Some(p),
Err(_) => {
return Err(CamelError::ProcessorError("semaphore closed".to_string()));
}
},
None => None,
};
// Readiness errors propagate via `?` into the per-endpoint result;
// join_all ensures all endpoints run independently (no early abort).
tower::ServiceExt::ready(&mut endpoint).await?;
endpoint.call(ex).await
}
})
.collect();
// Execute all futures concurrently and collect results
futures::future::join_all(futures).await
}
// ── Aggregation ────────────────────────────────────────────────────────
fn aggregate(
results: Vec<Result<Exchange, CamelError>>,
original: Exchange,
strategy: MulticastStrategy,
) -> Result<Exchange, CamelError> {
match strategy {
MulticastStrategy::LastWins => {
// Return the last result (error or success).
// If last result is Err and stop_on_exception=false, return that error.
results.into_iter().last().unwrap_or_else(|| Ok(original))
}
MulticastStrategy::CollectAll => {
// Collect all bodies into a JSON array. Errors propagate.
let mut bodies = Vec::new();
for result in results {
let ex = result?;
let value = match &ex.input.body {
Body::Text(s) => Value::String(s.clone()),
Body::Json(v) => v.clone(),
Body::Xml(s) => Value::String(s.clone()),
Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
Body::Empty => Value::Null,
Body::Stream(s) => serde_json::json!({
"_stream": {
"origin": s.metadata.origin,
"placeholder": true,
"hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
}
}),
};
bodies.push(value);
}
let mut out = original;
out.input.body = Body::Json(Value::Array(bodies));
Ok(out)
}
MulticastStrategy::Original => Ok(original),
MulticastStrategy::Custom(fold_fn) => {
// Fold using the custom function, starting from the first result.
let mut iter = results.into_iter();
let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
iter.try_fold(first, |acc, next_result| {
let next = next_result?;
Ok(fold_fn(acc, next))
})
}
}
}
#[cfg(test)]
#[path = "multicast_tests.rs"]
mod tests;