1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12 MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16use crate::config::DetailLevel;
17use crate::tracer::TracingProcessor;
18
19pub struct Route {
22 pub(crate) from_uri: String,
24 pub(crate) pipeline: BoxProcessor,
26 pub(crate) concurrency: Option<ConcurrencyModel>,
29}
30
31impl Route {
32 pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
34 Self {
35 from_uri: from_uri.into(),
36 pipeline,
37 concurrency: None,
38 }
39 }
40
41 pub fn from_uri(&self) -> &str {
43 &self.from_uri
44 }
45
46 pub fn into_pipeline(self) -> BoxProcessor {
48 self.pipeline
49 }
50
51 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
53 self.concurrency = Some(model);
54 self
55 }
56
57 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
59 self.concurrency.as_ref()
60 }
61
62 pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
64 (self.pipeline, self.concurrency)
65 }
66}
67
68pub struct WhenStep {
70 pub predicate: FilterPredicate,
71 pub steps: Vec<BuilderStep>,
72}
73
74pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
75
76#[derive(Debug)]
78pub struct DeclarativeWhenStep {
79 pub predicate: LanguageExpressionDef,
80 pub steps: Vec<BuilderStep>,
81}
82
83pub enum BuilderStep {
85 Processor(BoxProcessor),
87 To(String),
89 DeclarativeSetHeader { key: String, value: ValueSourceDef },
91 DeclarativeSetBody { value: ValueSourceDef },
93 DeclarativeFilter {
95 predicate: LanguageExpressionDef,
96 steps: Vec<BuilderStep>,
97 },
98 DeclarativeChoice {
100 whens: Vec<DeclarativeWhenStep>,
101 otherwise: Option<Vec<BuilderStep>>,
102 },
103 DeclarativeScript { expression: LanguageExpressionDef },
105 DeclarativeSplit {
107 expression: LanguageExpressionDef,
108 aggregation: camel_api::splitter::AggregationStrategy,
109 parallel: bool,
110 parallel_limit: Option<usize>,
111 stop_on_exception: bool,
112 steps: Vec<BuilderStep>,
113 },
114 Split {
116 config: SplitterConfig,
117 steps: Vec<BuilderStep>,
118 },
119 Aggregate { config: AggregatorConfig },
121 Filter {
123 predicate: FilterPredicate,
124 steps: Vec<BuilderStep>,
125 },
126 Choice {
129 whens: Vec<WhenStep>,
130 otherwise: Option<Vec<BuilderStep>>,
131 },
132 WireTap { uri: String },
134 Multicast {
136 steps: Vec<BuilderStep>,
137 config: MulticastConfig,
138 },
139 DeclarativeLog {
141 level: camel_processor::LogLevel,
142 message: ValueSourceDef,
143 },
144 Bean { name: String, method: String },
146}
147
148impl std::fmt::Debug for BuilderStep {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
152 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
153 BuilderStep::DeclarativeSetHeader { key, .. } => {
154 write!(
155 f,
156 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
157 )
158 }
159 BuilderStep::DeclarativeSetBody { .. } => {
160 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
161 }
162 BuilderStep::DeclarativeFilter { steps, .. } => {
163 write!(
164 f,
165 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
166 )
167 }
168 BuilderStep::DeclarativeChoice { whens, otherwise } => {
169 write!(
170 f,
171 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
172 whens.len(),
173 if otherwise.is_some() { "Some" } else { "None" }
174 )
175 }
176 BuilderStep::DeclarativeScript { expression } => write!(
177 f,
178 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
179 expression.language
180 ),
181 BuilderStep::DeclarativeSplit { steps, .. } => {
182 write!(
183 f,
184 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
185 )
186 }
187 BuilderStep::Split { steps, .. } => {
188 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
189 }
190 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
191 BuilderStep::Filter { steps, .. } => {
192 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
193 }
194 BuilderStep::Choice { whens, otherwise } => {
195 write!(
196 f,
197 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
198 whens.len(),
199 if otherwise.is_some() { "Some" } else { "None" }
200 )
201 }
202 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
203 BuilderStep::Multicast { steps, .. } => {
204 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
205 }
206 BuilderStep::DeclarativeLog { level, .. } => {
207 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
208 }
209 BuilderStep::Bean { name, method } => {
210 write!(
211 f,
212 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
213 )
214 }
215 }
216 }
217}
218
219pub struct RouteDefinition {
221 pub(crate) from_uri: String,
222 pub(crate) steps: Vec<BuilderStep>,
223 pub(crate) error_handler: Option<ErrorHandlerConfig>,
225 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
227 pub(crate) concurrency: Option<ConcurrencyModel>,
230 pub(crate) route_id: String,
232 pub(crate) auto_startup: bool,
234 pub(crate) startup_order: i32,
236}
237
238impl RouteDefinition {
239 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
241 Self {
242 from_uri: from_uri.into(),
243 steps,
244 error_handler: None,
245 circuit_breaker: None,
246 concurrency: None,
247 route_id: String::new(), auto_startup: true,
249 startup_order: 1000,
250 }
251 }
252
253 pub fn from_uri(&self) -> &str {
255 &self.from_uri
256 }
257
258 pub fn steps(&self) -> &[BuilderStep] {
260 &self.steps
261 }
262
263 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
265 self.error_handler = Some(config);
266 self
267 }
268
269 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
271 self.circuit_breaker = Some(config);
272 self
273 }
274
275 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
277 self.circuit_breaker.as_ref()
278 }
279
280 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
282 self.concurrency.as_ref()
283 }
284
285 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
287 self.concurrency = Some(model);
288 self
289 }
290
291 pub fn route_id(&self) -> &str {
293 &self.route_id
294 }
295
296 pub fn auto_startup(&self) -> bool {
298 self.auto_startup
299 }
300
301 pub fn startup_order(&self) -> i32 {
303 self.startup_order
304 }
305
306 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
308 self.route_id = id.into();
309 self
310 }
311
312 pub fn with_auto_startup(mut self, auto: bool) -> Self {
314 self.auto_startup = auto;
315 self
316 }
317
318 pub fn with_startup_order(mut self, order: i32) -> Self {
320 self.startup_order = order;
321 self
322 }
323
324 pub fn to_info(&self) -> RouteDefinitionInfo {
327 RouteDefinitionInfo {
328 route_id: self.route_id.clone(),
329 auto_startup: self.auto_startup,
330 startup_order: self.startup_order,
331 }
332 }
333}
334
335#[derive(Clone)]
341pub struct RouteDefinitionInfo {
342 route_id: String,
343 auto_startup: bool,
344 startup_order: i32,
345}
346
347impl RouteDefinitionInfo {
348 pub fn route_id(&self) -> &str {
350 &self.route_id
351 }
352
353 pub fn auto_startup(&self) -> bool {
355 self.auto_startup
356 }
357
358 pub fn startup_order(&self) -> i32 {
360 self.startup_order
361 }
362}
363
364pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
366 if processors.is_empty() {
367 return BoxProcessor::new(IdentityProcessor);
368 }
369 BoxProcessor::new(SequentialPipeline { steps: processors })
370}
371
372pub fn compose_traced_pipeline(
377 processors: Vec<BoxProcessor>,
378 route_id: &str,
379 trace_enabled: bool,
380 detail_level: DetailLevel,
381) -> BoxProcessor {
382 if !trace_enabled {
383 return compose_pipeline(processors);
384 }
385
386 if processors.is_empty() {
387 return BoxProcessor::new(IdentityProcessor);
388 }
389
390 let wrapped: Vec<BoxProcessor> = processors
392 .into_iter()
393 .enumerate()
394 .map(|(idx, processor)| {
395 BoxProcessor::new(TracingProcessor::new(
396 processor,
397 route_id.to_string(),
398 idx,
399 detail_level.clone(),
400 ))
401 })
402 .collect();
403
404 BoxProcessor::new(SequentialPipeline { steps: wrapped })
405}
406
407#[derive(Clone)]
409struct SequentialPipeline {
410 steps: Vec<BoxProcessor>,
411}
412
413impl Service<Exchange> for SequentialPipeline {
414 type Response = Exchange;
415 type Error = CamelError;
416 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
417
418 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
419 if let Some(first) = self.steps.first_mut() {
420 first.poll_ready(cx)
421 } else {
422 Poll::Ready(Ok(()))
423 }
424 }
425
426 fn call(&mut self, exchange: Exchange) -> Self::Future {
427 let mut steps = self.steps.clone();
428 Box::pin(async move {
429 let mut ex = exchange;
430 for step in &mut steps {
431 ex = step.ready().await?.call(ex).await?;
432 }
433 Ok(ex)
434 })
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use camel_api::BoxProcessorExt;
442 use std::sync::Arc;
443 use std::sync::atomic::{AtomicBool, Ordering};
444
445 #[derive(Clone)]
447 struct DelayedReadyService {
448 ready: Arc<AtomicBool>,
449 }
450
451 impl Service<Exchange> for DelayedReadyService {
452 type Response = Exchange;
453 type Error = CamelError;
454 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
455
456 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
457 if self.ready.fetch_or(true, Ordering::SeqCst) {
458 Poll::Ready(Ok(()))
460 } else {
461 cx.waker().wake_by_ref();
463 Poll::Pending
464 }
465 }
466
467 fn call(&mut self, ex: Exchange) -> Self::Future {
468 Box::pin(async move { Ok(ex) })
469 }
470 }
471
472 #[test]
473 fn test_pipeline_poll_ready_delegates_to_first_step() {
474 let waker = futures::task::noop_waker();
475 let mut cx = Context::from_waker(&waker);
476
477 let inner = DelayedReadyService {
478 ready: Arc::new(AtomicBool::new(false)),
479 };
480 let boxed = BoxProcessor::new(inner);
481 let mut pipeline = SequentialPipeline { steps: vec![boxed] };
482
483 let first = pipeline.poll_ready(&mut cx);
485 assert!(first.is_pending(), "expected Pending on first poll_ready");
486
487 let second = pipeline.poll_ready(&mut cx);
489 assert!(second.is_ready(), "expected Ready on second poll_ready");
490 }
491
492 #[test]
493 fn test_pipeline_poll_ready_with_empty_steps() {
494 let waker = futures::task::noop_waker();
495 let mut cx = Context::from_waker(&waker);
496
497 let mut pipeline = SequentialPipeline { steps: vec![] };
498
499 let result = pipeline.poll_ready(&mut cx);
501 assert!(result.is_ready(), "expected Ready for empty pipeline");
502 }
503
504 #[tokio::test]
508 async fn test_pipeline_stops_gracefully_on_stopped_error() {
509 use std::sync::{
510 Arc,
511 atomic::{AtomicBool, Ordering},
512 };
513
514 let after_called = Arc::new(AtomicBool::new(false));
516 let after_called_clone = after_called.clone();
517
518 let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
519 let after_step = BoxProcessor::from_fn(move |ex| {
520 after_called_clone.store(true, Ordering::SeqCst);
521 Box::pin(async move { Ok(ex) })
522 });
523
524 let mut pipeline = SequentialPipeline {
525 steps: vec![stop_step, after_step],
526 };
527
528 let ex = Exchange::new(camel_api::Message::new("hello"));
529 let result = pipeline.call(ex).await;
530
531 assert!(
533 matches!(result, Err(CamelError::Stopped)),
534 "expected Err(Stopped), got: {:?}",
535 result
536 );
537 assert!(
539 !after_called.load(Ordering::SeqCst),
540 "step after stop should not be called"
541 );
542 }
543
544 #[test]
545 fn test_builder_step_multicast_variant() {
546 use camel_api::MulticastConfig;
547
548 let step = BuilderStep::Multicast {
549 steps: vec![BuilderStep::To("direct:a".into())],
550 config: MulticastConfig::new(),
551 };
552
553 assert!(matches!(step, BuilderStep::Multicast { .. }));
554 }
555
556 #[test]
557 fn test_route_definition_defaults() {
558 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
559 assert_eq!(def.route_id(), "test-route");
560 assert!(def.auto_startup());
561 assert_eq!(def.startup_order(), 1000);
562 }
563
564 #[test]
565 fn test_route_definition_builders() {
566 let def = RouteDefinition::new("direct:test", vec![])
567 .with_route_id("my-route")
568 .with_auto_startup(false)
569 .with_startup_order(50);
570 assert_eq!(def.route_id(), "my-route");
571 assert!(!def.auto_startup());
572 assert_eq!(def.startup_order(), 50);
573 }
574
575 #[test]
576 fn test_choice_builder_step_debug() {
577 use camel_api::{Exchange, FilterPredicate};
578 use std::sync::Arc;
579
580 fn always_true(_: &Exchange) -> bool {
581 true
582 }
583
584 let step = BuilderStep::Choice {
585 whens: vec![crate::route::WhenStep {
586 predicate: Arc::new(always_true) as FilterPredicate,
587 steps: vec![BuilderStep::To("mock:a".into())],
588 }],
589 otherwise: None,
590 };
591 let debug = format!("{step:?}");
592 assert!(debug.contains("Choice"));
593 }
594
595 #[tokio::test]
596 async fn test_compose_traced_pipeline_disabled() {
597 let pipeline = compose_traced_pipeline(vec![], "test-route", false, DetailLevel::Minimal);
598 let ex = Exchange::new(camel_api::Message::new("hello"));
600 let result = tower::ServiceExt::oneshot(pipeline, ex).await;
601 assert!(result.is_ok());
602 }
603
604 #[tokio::test]
605 async fn test_compose_traced_pipeline_enabled() {
606 use camel_api::BoxProcessorExt;
607
608 let step = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
609 let pipeline =
610 compose_traced_pipeline(vec![step], "test-route", true, DetailLevel::Minimal);
611 let ex = Exchange::new(camel_api::Message::new("hello"));
612 let result = tower::ServiceExt::oneshot(pipeline, ex).await;
613 assert!(result.is_ok());
614 }
615}