1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12 MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16pub struct Route {
19 pub(crate) from_uri: String,
21 pub(crate) pipeline: BoxProcessor,
23 pub(crate) concurrency: Option<ConcurrencyModel>,
26}
27
28impl Route {
29 pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
31 Self {
32 from_uri: from_uri.into(),
33 pipeline,
34 concurrency: None,
35 }
36 }
37
38 pub fn from_uri(&self) -> &str {
40 &self.from_uri
41 }
42
43 pub fn into_pipeline(self) -> BoxProcessor {
45 self.pipeline
46 }
47
48 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
50 self.concurrency = Some(model);
51 self
52 }
53
54 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
56 self.concurrency.as_ref()
57 }
58
59 pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
61 (self.pipeline, self.concurrency)
62 }
63}
64
65pub enum BuilderStep {
67 Processor(BoxProcessor),
69 To(String),
71 Split {
73 config: SplitterConfig,
74 steps: Vec<BuilderStep>,
75 },
76 Aggregate { config: AggregatorConfig },
78 Filter {
80 predicate: FilterPredicate,
81 steps: Vec<BuilderStep>,
82 },
83 WireTap { uri: String },
85 Multicast {
87 steps: Vec<BuilderStep>,
88 config: MulticastConfig,
89 },
90}
91
92impl std::fmt::Debug for BuilderStep {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 match self {
95 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
96 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
97 BuilderStep::Split { steps, .. } => {
98 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
99 }
100 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
101 BuilderStep::Filter { steps, .. } => {
102 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
103 }
104 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
105 BuilderStep::Multicast { steps, .. } => {
106 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
107 }
108 }
109 }
110}
111
112pub struct RouteDefinition {
114 pub(crate) from_uri: String,
115 pub(crate) steps: Vec<BuilderStep>,
116 pub(crate) error_handler: Option<ErrorHandlerConfig>,
118 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
120 pub(crate) concurrency: Option<ConcurrencyModel>,
123 pub(crate) route_id: Option<String>,
125 pub(crate) auto_startup: bool,
127 pub(crate) startup_order: i32,
129}
130
131impl RouteDefinition {
132 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
134 Self {
135 from_uri: from_uri.into(),
136 steps,
137 error_handler: None,
138 circuit_breaker: None,
139 concurrency: None,
140 route_id: None,
141 auto_startup: true,
142 startup_order: 1000,
143 }
144 }
145
146 pub fn from_uri(&self) -> &str {
148 &self.from_uri
149 }
150
151 pub fn steps(&self) -> &[BuilderStep] {
153 &self.steps
154 }
155
156 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
158 self.error_handler = Some(config);
159 self
160 }
161
162 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
164 self.circuit_breaker = Some(config);
165 self
166 }
167
168 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
170 self.circuit_breaker.as_ref()
171 }
172
173 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
175 self.concurrency.as_ref()
176 }
177
178 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
180 self.concurrency = Some(model);
181 self
182 }
183
184 pub fn route_id(&self) -> Option<&str> {
186 self.route_id.as_deref()
187 }
188
189 pub fn auto_startup(&self) -> bool {
191 self.auto_startup
192 }
193
194 pub fn startup_order(&self) -> i32 {
196 self.startup_order
197 }
198
199 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
201 self.route_id = Some(id.into());
202 self
203 }
204
205 pub fn with_auto_startup(mut self, auto: bool) -> Self {
207 self.auto_startup = auto;
208 self
209 }
210
211 pub fn with_startup_order(mut self, order: i32) -> Self {
213 self.startup_order = order;
214 self
215 }
216
217 pub fn to_info(&self) -> RouteDefinitionInfo {
220 RouteDefinitionInfo {
221 route_id: self.route_id.clone(),
222 auto_startup: self.auto_startup,
223 startup_order: self.startup_order,
224 }
225 }
226}
227
228#[derive(Clone)]
234pub struct RouteDefinitionInfo {
235 route_id: Option<String>,
236 auto_startup: bool,
237 startup_order: i32,
238}
239
240impl RouteDefinitionInfo {
241 pub fn route_id(&self) -> Option<&str> {
243 self.route_id.as_deref()
244 }
245
246 pub fn auto_startup(&self) -> bool {
248 self.auto_startup
249 }
250
251 pub fn startup_order(&self) -> i32 {
253 self.startup_order
254 }
255}
256
257pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
259 if processors.is_empty() {
260 return BoxProcessor::new(IdentityProcessor);
261 }
262 BoxProcessor::new(SequentialPipeline { steps: processors })
263}
264
265#[derive(Clone)]
267struct SequentialPipeline {
268 steps: Vec<BoxProcessor>,
269}
270
271impl Service<Exchange> for SequentialPipeline {
272 type Response = Exchange;
273 type Error = CamelError;
274 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
275
276 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277 if let Some(first) = self.steps.first_mut() {
278 first.poll_ready(cx)
279 } else {
280 Poll::Ready(Ok(()))
281 }
282 }
283
284 fn call(&mut self, exchange: Exchange) -> Self::Future {
285 let mut steps = self.steps.clone();
286 Box::pin(async move {
287 let mut ex = exchange;
288 for step in &mut steps {
289 ex = step.ready().await?.call(ex).await?;
290 }
291 Ok(ex)
292 })
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use camel_api::BoxProcessorExt;
300 use std::sync::Arc;
301 use std::sync::atomic::{AtomicBool, Ordering};
302
303 #[derive(Clone)]
305 struct DelayedReadyService {
306 ready: Arc<AtomicBool>,
307 }
308
309 impl Service<Exchange> for DelayedReadyService {
310 type Response = Exchange;
311 type Error = CamelError;
312 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
313
314 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
315 if self.ready.fetch_or(true, Ordering::SeqCst) {
316 Poll::Ready(Ok(()))
318 } else {
319 cx.waker().wake_by_ref();
321 Poll::Pending
322 }
323 }
324
325 fn call(&mut self, ex: Exchange) -> Self::Future {
326 Box::pin(async move { Ok(ex) })
327 }
328 }
329
330 #[test]
331 fn test_pipeline_poll_ready_delegates_to_first_step() {
332 let waker = futures::task::noop_waker();
333 let mut cx = Context::from_waker(&waker);
334
335 let inner = DelayedReadyService {
336 ready: Arc::new(AtomicBool::new(false)),
337 };
338 let boxed = BoxProcessor::new(inner);
339 let mut pipeline = SequentialPipeline { steps: vec![boxed] };
340
341 let first = pipeline.poll_ready(&mut cx);
343 assert!(first.is_pending(), "expected Pending on first poll_ready");
344
345 let second = pipeline.poll_ready(&mut cx);
347 assert!(second.is_ready(), "expected Ready on second poll_ready");
348 }
349
350 #[test]
351 fn test_pipeline_poll_ready_with_empty_steps() {
352 let waker = futures::task::noop_waker();
353 let mut cx = Context::from_waker(&waker);
354
355 let mut pipeline = SequentialPipeline { steps: vec![] };
356
357 let result = pipeline.poll_ready(&mut cx);
359 assert!(result.is_ready(), "expected Ready for empty pipeline");
360 }
361
362 #[tokio::test]
366 async fn test_pipeline_stops_gracefully_on_stopped_error() {
367 use std::sync::{
368 Arc,
369 atomic::{AtomicBool, Ordering},
370 };
371
372 let after_called = Arc::new(AtomicBool::new(false));
374 let after_called_clone = after_called.clone();
375
376 let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
377 let after_step = BoxProcessor::from_fn(move |ex| {
378 after_called_clone.store(true, Ordering::SeqCst);
379 Box::pin(async move { Ok(ex) })
380 });
381
382 let mut pipeline = SequentialPipeline {
383 steps: vec![stop_step, after_step],
384 };
385
386 let ex = Exchange::new(camel_api::Message::new("hello"));
387 let result = pipeline.call(ex).await;
388
389 assert!(
391 matches!(result, Err(CamelError::Stopped)),
392 "expected Err(Stopped), got: {:?}",
393 result
394 );
395 assert!(
397 !after_called.load(Ordering::SeqCst),
398 "step after stop should not be called"
399 );
400 }
401
402 #[test]
403 fn test_builder_step_multicast_variant() {
404 use camel_api::MulticastConfig;
405
406 let step = BuilderStep::Multicast {
407 steps: vec![BuilderStep::To("direct:a".into())],
408 config: MulticastConfig::new(),
409 };
410
411 assert!(matches!(step, BuilderStep::Multicast { .. }));
412 }
413
414 #[test]
415 fn test_route_definition_defaults() {
416 let def = RouteDefinition::new("direct:test", vec![]);
417 assert_eq!(def.route_id(), None);
418 assert!(def.auto_startup());
419 assert_eq!(def.startup_order(), 1000);
420 }
421
422 #[test]
423 fn test_route_definition_builders() {
424 let def = RouteDefinition::new("direct:test", vec![])
425 .with_route_id("my-route")
426 .with_auto_startup(false)
427 .with_startup_order(50);
428 assert_eq!(def.route_id(), Some("my-route"));
429 assert!(!def.auto_startup());
430 assert_eq!(def.startup_order(), 50);
431 }
432}