1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component_api::ConcurrencyModel;
9
10pub struct WhenStep {
12 pub predicate: FilterPredicate,
13 pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21 pub predicate: LanguageExpressionDef,
22 pub steps: Vec<BuilderStep>,
23}
24
25pub enum BuilderStep {
27 Processor(BoxProcessor),
29 To(String),
31 Stop,
33 Log {
35 level: camel_processor::LogLevel,
36 message: String,
37 },
38 DeclarativeSetHeader {
40 key: String,
41 value: ValueSourceDef,
42 },
43 DeclarativeSetBody {
45 value: ValueSourceDef,
46 },
47 DeclarativeFilter {
49 predicate: LanguageExpressionDef,
50 steps: Vec<BuilderStep>,
51 },
52 DeclarativeChoice {
54 whens: Vec<DeclarativeWhenStep>,
55 otherwise: Option<Vec<BuilderStep>>,
56 },
57 DeclarativeScript {
59 expression: LanguageExpressionDef,
60 },
61 DeclarativeSplit {
63 expression: LanguageExpressionDef,
64 aggregation: camel_api::splitter::AggregationStrategy,
65 parallel: bool,
66 parallel_limit: Option<usize>,
67 stop_on_exception: bool,
68 steps: Vec<BuilderStep>,
69 },
70 DeclarativeDynamicRouter {
71 expression: LanguageExpressionDef,
72 uri_delimiter: String,
73 cache_size: i32,
74 ignore_invalid_endpoints: bool,
75 max_iterations: usize,
76 },
77 DeclarativeRoutingSlip {
78 expression: LanguageExpressionDef,
79 uri_delimiter: String,
80 cache_size: i32,
81 ignore_invalid_endpoints: bool,
82 },
83 Split {
85 config: SplitterConfig,
86 steps: Vec<BuilderStep>,
87 },
88 Aggregate {
90 config: AggregatorConfig,
91 },
92 Filter {
94 predicate: FilterPredicate,
95 steps: Vec<BuilderStep>,
96 },
97 Choice {
100 whens: Vec<WhenStep>,
101 otherwise: Option<Vec<BuilderStep>>,
102 },
103 WireTap {
105 uri: String,
106 },
107 Multicast {
109 steps: Vec<BuilderStep>,
110 config: MulticastConfig,
111 },
112 DeclarativeLog {
114 level: camel_processor::LogLevel,
115 message: ValueSourceDef,
116 },
117 Bean {
119 name: String,
120 method: String,
121 },
122 Script {
125 language: String,
126 script: String,
127 },
128 Throttle {
130 config: camel_api::ThrottlerConfig,
131 steps: Vec<BuilderStep>,
132 },
133 LoadBalance {
135 config: camel_api::LoadBalancerConfig,
136 steps: Vec<BuilderStep>,
137 },
138 DynamicRouter {
140 config: camel_api::DynamicRouterConfig,
141 },
142 RoutingSlip {
143 config: camel_api::RoutingSlipConfig,
144 },
145 RecipientList {
146 config: camel_api::recipient_list::RecipientListConfig,
147 },
148 DeclarativeRecipientList {
149 expression: LanguageExpressionDef,
150 delimiter: String,
151 parallel: bool,
152 parallel_limit: Option<usize>,
153 stop_on_exception: bool,
154 aggregation: String,
155 },
156 Delay {
157 config: camel_api::DelayConfig,
158 },
159}
160
161impl std::fmt::Debug for BuilderStep {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
165 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
166 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
167 BuilderStep::Log { level, message } => write!(
168 f,
169 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
170 ),
171 BuilderStep::DeclarativeSetHeader { key, .. } => {
172 write!(
173 f,
174 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
175 )
176 }
177 BuilderStep::DeclarativeSetBody { .. } => {
178 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
179 }
180 BuilderStep::DeclarativeFilter { steps, .. } => {
181 write!(
182 f,
183 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
184 )
185 }
186 BuilderStep::DeclarativeChoice { whens, otherwise } => {
187 write!(
188 f,
189 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
190 whens.len(),
191 if otherwise.is_some() { "Some" } else { "None" }
192 )
193 }
194 BuilderStep::DeclarativeScript { expression } => write!(
195 f,
196 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
197 expression.language
198 ),
199 BuilderStep::DeclarativeSplit { steps, .. } => {
200 write!(
201 f,
202 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
203 )
204 }
205 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
206 f,
207 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
208 expression.language
209 ),
210 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
211 f,
212 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
213 expression.language
214 ),
215 BuilderStep::Split { steps, .. } => {
216 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
217 }
218 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
219 BuilderStep::Filter { steps, .. } => {
220 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
221 }
222 BuilderStep::Choice { whens, otherwise } => {
223 write!(
224 f,
225 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
226 whens.len(),
227 if otherwise.is_some() { "Some" } else { "None" }
228 )
229 }
230 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
231 BuilderStep::Multicast { steps, .. } => {
232 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
233 }
234 BuilderStep::DeclarativeLog { level, .. } => {
235 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
236 }
237 BuilderStep::Bean { name, method } => {
238 write!(
239 f,
240 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
241 )
242 }
243 BuilderStep::Script { language, .. } => {
244 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
245 }
246 BuilderStep::Throttle { steps, .. } => {
247 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
248 }
249 BuilderStep::LoadBalance { steps, .. } => {
250 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
251 }
252 BuilderStep::DynamicRouter { .. } => {
253 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
254 }
255 BuilderStep::RoutingSlip { .. } => {
256 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
257 }
258 BuilderStep::RecipientList { .. } => {
259 write!(f, "BuilderStep::RecipientList {{ .. }}")
260 }
261 BuilderStep::DeclarativeRecipientList {
262 expression,
263 aggregation,
264 ..
265 } => write!(
266 f,
267 "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
268 expression.language, aggregation
269 ),
270 BuilderStep::Delay { config } => {
271 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
272 }
273 }
274 }
275}
276
277pub struct RouteDefinition {
279 pub(crate) from_uri: String,
280 pub(crate) steps: Vec<BuilderStep>,
281 pub(crate) error_handler: Option<ErrorHandlerConfig>,
283 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
285 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
287 pub(crate) concurrency: Option<ConcurrencyModel>,
290 pub(crate) route_id: String,
292 pub(crate) auto_startup: bool,
294 pub(crate) startup_order: i32,
296 pub(crate) source_hash: Option<u64>,
297}
298
299impl RouteDefinition {
300 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
302 Self {
303 from_uri: from_uri.into(),
304 steps,
305 error_handler: None,
306 circuit_breaker: None,
307 unit_of_work: None,
308 concurrency: None,
309 route_id: String::new(), auto_startup: true,
311 startup_order: 1000,
312 source_hash: None,
313 }
314 }
315
316 pub fn from_uri(&self) -> &str {
318 &self.from_uri
319 }
320
321 pub fn steps(&self) -> &[BuilderStep] {
323 &self.steps
324 }
325
326 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
328 self.error_handler = Some(config);
329 self
330 }
331
332 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
334 self.error_handler.as_ref()
335 }
336
337 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
339 self.circuit_breaker = Some(config);
340 self
341 }
342
343 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
345 self.unit_of_work = Some(config);
346 self
347 }
348
349 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
351 self.unit_of_work.as_ref()
352 }
353
354 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
356 self.circuit_breaker.as_ref()
357 }
358
359 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
361 self.concurrency.as_ref()
362 }
363
364 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
366 self.concurrency = Some(model);
367 self
368 }
369
370 pub fn route_id(&self) -> &str {
372 &self.route_id
373 }
374
375 pub fn auto_startup(&self) -> bool {
377 self.auto_startup
378 }
379
380 pub fn startup_order(&self) -> i32 {
382 self.startup_order
383 }
384
385 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
387 self.route_id = id.into();
388 self
389 }
390
391 pub fn with_auto_startup(mut self, auto: bool) -> Self {
393 self.auto_startup = auto;
394 self
395 }
396
397 pub fn with_startup_order(mut self, order: i32) -> Self {
399 self.startup_order = order;
400 self
401 }
402
403 pub fn with_source_hash(mut self, hash: u64) -> Self {
404 self.source_hash = Some(hash);
405 self
406 }
407
408 pub fn source_hash(&self) -> Option<u64> {
409 self.source_hash
410 }
411
412 pub fn to_info(&self) -> RouteDefinitionInfo {
415 RouteDefinitionInfo {
416 route_id: self.route_id.clone(),
417 auto_startup: self.auto_startup,
418 startup_order: self.startup_order,
419 source_hash: self.source_hash,
420 }
421 }
422}
423
424#[derive(Clone)]
430pub struct RouteDefinitionInfo {
431 route_id: String,
432 auto_startup: bool,
433 startup_order: i32,
434 pub(crate) source_hash: Option<u64>,
435}
436
437impl RouteDefinitionInfo {
438 pub fn route_id(&self) -> &str {
440 &self.route_id
441 }
442
443 pub fn auto_startup(&self) -> bool {
445 self.auto_startup
446 }
447
448 pub fn startup_order(&self) -> i32 {
450 self.startup_order
451 }
452
453 pub fn source_hash(&self) -> Option<u64> {
454 self.source_hash
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_builder_step_multicast_variant() {
464 use camel_api::MulticastConfig;
465
466 let step = BuilderStep::Multicast {
467 steps: vec![BuilderStep::To("direct:a".into())],
468 config: MulticastConfig::new(),
469 };
470
471 assert!(matches!(step, BuilderStep::Multicast { .. }));
472 }
473
474 #[test]
475 fn test_route_definition_defaults() {
476 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
477 assert_eq!(def.route_id(), "test-route");
478 assert!(def.auto_startup());
479 assert_eq!(def.startup_order(), 1000);
480 }
481
482 #[test]
483 fn test_route_definition_builders() {
484 let def = RouteDefinition::new("direct:test", vec![])
485 .with_route_id("my-route")
486 .with_auto_startup(false)
487 .with_startup_order(50);
488 assert_eq!(def.route_id(), "my-route");
489 assert!(!def.auto_startup());
490 assert_eq!(def.startup_order(), 50);
491 }
492
493 #[test]
494 fn test_route_definition_accessors_cover_core_fields() {
495 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
496 .with_route_id("accessor-route");
497
498 assert_eq!(def.from_uri(), "direct:in");
499 assert_eq!(def.steps().len(), 1);
500 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
501 }
502
503 #[test]
504 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
505 use camel_api::circuit_breaker::CircuitBreakerConfig;
506 use camel_api::error_handler::ErrorHandlerConfig;
507 use camel_component_api::ConcurrencyModel;
508
509 let def = RouteDefinition::new("direct:test", vec![])
510 .with_route_id("eh-route")
511 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
512 .with_circuit_breaker(CircuitBreakerConfig::new())
513 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
514
515 let eh = def
516 .error_handler_config()
517 .expect("error handler should be set");
518 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
519 assert!(def.circuit_breaker_config().is_some());
520 assert!(matches!(
521 def.concurrency_override(),
522 Some(ConcurrencyModel::Concurrent { max: Some(4) })
523 ));
524 }
525
526 #[test]
527 fn test_builder_step_debug_covers_many_variants() {
528 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
529 use camel_api::{
530 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
531 };
532 use std::sync::Arc;
533
534 let expr = LanguageExpressionDef {
535 language: "simple".into(),
536 source: "${body}".into(),
537 };
538
539 let steps = vec![
540 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
541 BuilderStep::To("mock:out".into()),
542 BuilderStep::Stop,
543 BuilderStep::Log {
544 level: camel_processor::LogLevel::Info,
545 message: "hello".into(),
546 },
547 BuilderStep::DeclarativeSetHeader {
548 key: "k".into(),
549 value: ValueSourceDef::Literal(Value::String("v".into())),
550 },
551 BuilderStep::DeclarativeSetBody {
552 value: ValueSourceDef::Expression(expr.clone()),
553 },
554 BuilderStep::DeclarativeFilter {
555 predicate: expr.clone(),
556 steps: vec![BuilderStep::Stop],
557 },
558 BuilderStep::DeclarativeChoice {
559 whens: vec![DeclarativeWhenStep {
560 predicate: expr.clone(),
561 steps: vec![BuilderStep::Stop],
562 }],
563 otherwise: Some(vec![BuilderStep::Stop]),
564 },
565 BuilderStep::DeclarativeScript {
566 expression: expr.clone(),
567 },
568 BuilderStep::DeclarativeSplit {
569 expression: expr.clone(),
570 aggregation: AggregationStrategy::Original,
571 parallel: false,
572 parallel_limit: Some(2),
573 stop_on_exception: true,
574 steps: vec![BuilderStep::Stop],
575 },
576 BuilderStep::Split {
577 config: SplitterConfig::new(split_body_lines()),
578 steps: vec![BuilderStep::Stop],
579 },
580 BuilderStep::Aggregate {
581 config: camel_api::AggregatorConfig::correlate_by("id")
582 .complete_when_size(1)
583 .build(),
584 },
585 BuilderStep::Filter {
586 predicate: Arc::new(|_: &Exchange| true),
587 steps: vec![BuilderStep::Stop],
588 },
589 BuilderStep::WireTap {
590 uri: "mock:tap".into(),
591 },
592 BuilderStep::DeclarativeLog {
593 level: camel_processor::LogLevel::Info,
594 message: ValueSourceDef::Expression(expr.clone()),
595 },
596 BuilderStep::Bean {
597 name: "bean".into(),
598 method: "call".into(),
599 },
600 BuilderStep::Script {
601 language: "rhai".into(),
602 script: "body".into(),
603 },
604 BuilderStep::Throttle {
605 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
606 steps: vec![BuilderStep::Stop],
607 },
608 BuilderStep::LoadBalance {
609 config: camel_api::LoadBalancerConfig::round_robin(),
610 steps: vec![BuilderStep::To("mock:l1".into())],
611 },
612 BuilderStep::DynamicRouter {
613 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
614 },
615 BuilderStep::RoutingSlip {
616 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
617 },
618 ];
619
620 for step in steps {
621 let dbg = format!("{step:?}");
622 assert!(!dbg.is_empty());
623 }
624 }
625
626 #[test]
627 fn test_route_definition_to_info_preserves_metadata() {
628 let info = RouteDefinition::new("direct:test", vec![])
629 .with_route_id("meta-route")
630 .with_auto_startup(false)
631 .with_startup_order(7)
632 .to_info();
633
634 assert_eq!(info.route_id(), "meta-route");
635 assert!(!info.auto_startup());
636 assert_eq!(info.startup_order(), 7);
637 }
638
639 #[test]
640 fn test_choice_builder_step_debug() {
641 use camel_api::{Exchange, FilterPredicate};
642 use std::sync::Arc;
643
644 fn always_true(_: &Exchange) -> bool {
645 true
646 }
647
648 let step = BuilderStep::Choice {
649 whens: vec![WhenStep {
650 predicate: Arc::new(always_true) as FilterPredicate,
651 steps: vec![BuilderStep::To("mock:a".into())],
652 }],
653 otherwise: None,
654 };
655 let debug = format!("{step:?}");
656 assert!(debug.contains("Choice"));
657 }
658
659 #[test]
660 fn test_route_definition_unit_of_work() {
661 use camel_api::UnitOfWorkConfig;
662 let config = UnitOfWorkConfig {
663 on_complete: Some("log:complete".into()),
664 on_failure: Some("log:failed".into()),
665 };
666 let def = RouteDefinition::new("direct:test", vec![])
667 .with_route_id("uow-test")
668 .with_unit_of_work(config.clone());
669 assert_eq!(
670 def.unit_of_work_config().unwrap().on_complete.as_deref(),
671 Some("log:complete")
672 );
673 assert_eq!(
674 def.unit_of_work_config().unwrap().on_failure.as_deref(),
675 Some("log:failed")
676 );
677
678 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
679 assert!(def_no_uow.unit_of_work_config().is_none());
680 }
681}