1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::loop_eip::LoopConfig;
8use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
9use camel_component_api::ConcurrencyModel;
10
11pub struct WhenStep {
13 pub predicate: FilterPredicate,
14 pub steps: Vec<BuilderStep>,
15}
16
17pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
18
19#[derive(Debug)]
21pub struct DeclarativeWhenStep {
22 pub predicate: LanguageExpressionDef,
23 pub steps: Vec<BuilderStep>,
24}
25
26pub enum BuilderStep {
28 Processor(BoxProcessor),
30 To(String),
32 Stop,
34 Log {
36 level: camel_processor::LogLevel,
37 message: String,
38 },
39 DeclarativeSetHeader {
41 key: String,
42 value: ValueSourceDef,
43 },
44 DeclarativeSetProperty {
45 key: String,
46 value_source: ValueSourceDef,
47 },
48 DeclarativeSetBody {
50 value: ValueSourceDef,
51 },
52 DeclarativeFilter {
54 predicate: LanguageExpressionDef,
55 steps: Vec<BuilderStep>,
56 },
57 DeclarativeChoice {
59 whens: Vec<DeclarativeWhenStep>,
60 otherwise: Option<Vec<BuilderStep>>,
61 },
62 DeclarativeScript {
64 expression: LanguageExpressionDef,
65 },
66 DeclarativeFunction {
67 definition: camel_api::FunctionDefinition,
68 },
69 DeclarativeSplit {
71 expression: LanguageExpressionDef,
72 aggregation: camel_api::splitter::AggregationStrategy,
73 parallel: bool,
74 parallel_limit: Option<usize>,
75 stop_on_exception: bool,
76 steps: Vec<BuilderStep>,
77 },
78 DeclarativeDynamicRouter {
79 expression: LanguageExpressionDef,
80 uri_delimiter: String,
81 cache_size: i32,
82 ignore_invalid_endpoints: bool,
83 max_iterations: usize,
84 },
85 DeclarativeRoutingSlip {
86 expression: LanguageExpressionDef,
87 uri_delimiter: String,
88 cache_size: i32,
89 ignore_invalid_endpoints: bool,
90 },
91 Split {
93 config: SplitterConfig,
94 steps: Vec<BuilderStep>,
95 },
96 Aggregate {
98 config: AggregatorConfig,
99 },
100 Filter {
102 predicate: FilterPredicate,
103 steps: Vec<BuilderStep>,
104 },
105 Choice {
108 whens: Vec<WhenStep>,
109 otherwise: Option<Vec<BuilderStep>>,
110 },
111 WireTap {
113 uri: String,
114 },
115 Multicast {
117 steps: Vec<BuilderStep>,
118 config: MulticastConfig,
119 },
120 DeclarativeLog {
122 level: camel_processor::LogLevel,
123 message: ValueSourceDef,
124 },
125 Bean {
127 name: String,
128 method: String,
129 },
130 Script {
133 language: String,
134 script: String,
135 },
136 Throttle {
138 config: camel_api::ThrottlerConfig,
139 steps: Vec<BuilderStep>,
140 },
141 LoadBalance {
143 config: camel_api::LoadBalancerConfig,
144 steps: Vec<BuilderStep>,
145 },
146 DynamicRouter {
148 config: camel_api::DynamicRouterConfig,
149 },
150 RoutingSlip {
151 config: camel_api::RoutingSlipConfig,
152 },
153 RecipientList {
154 config: camel_api::recipient_list::RecipientListConfig,
155 },
156 DeclarativeRecipientList {
157 expression: LanguageExpressionDef,
158 delimiter: String,
159 parallel: bool,
160 parallel_limit: Option<usize>,
161 stop_on_exception: bool,
162 aggregation: String,
163 },
164 Delay {
165 config: camel_api::DelayConfig,
166 },
167 Loop {
169 config: LoopConfig,
170 steps: Vec<BuilderStep>,
171 },
172 DeclarativeLoop {
174 count: Option<usize>,
175 while_predicate: Option<LanguageExpressionDef>,
176 steps: Vec<BuilderStep>,
177 },
178}
179
180impl std::fmt::Debug for BuilderStep {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 match self {
183 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
184 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
185 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
186 BuilderStep::Log { level, message } => write!(
187 f,
188 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
189 ),
190 BuilderStep::DeclarativeSetHeader { key, .. } => {
191 write!(
192 f,
193 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
194 )
195 }
196 BuilderStep::DeclarativeSetBody { .. } => {
197 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
198 }
199 BuilderStep::DeclarativeSetProperty { key, .. } => {
200 write!(
201 f,
202 "BuilderStep::DeclarativeSetProperty {{ key: {key:?}, .. }}"
203 )
204 }
205 BuilderStep::DeclarativeFilter { steps, .. } => {
206 write!(
207 f,
208 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
209 )
210 }
211 BuilderStep::DeclarativeChoice { whens, otherwise } => {
212 write!(
213 f,
214 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
215 whens.len(),
216 if otherwise.is_some() { "Some" } else { "None" }
217 )
218 }
219 BuilderStep::DeclarativeScript { expression } => write!(
220 f,
221 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
222 expression.language
223 ),
224 BuilderStep::DeclarativeFunction { definition } => write!(
225 f,
226 "BuilderStep::DeclarativeFunction {{ id: {:?}, runtime: {:?}, .. }}",
227 definition.id, definition.runtime
228 ),
229 BuilderStep::DeclarativeSplit { steps, .. } => {
230 write!(
231 f,
232 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
233 )
234 }
235 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
236 f,
237 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
238 expression.language
239 ),
240 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
241 f,
242 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
243 expression.language
244 ),
245 BuilderStep::Split { steps, .. } => {
246 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
247 }
248 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
249 BuilderStep::Filter { steps, .. } => {
250 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
251 }
252 BuilderStep::Choice { whens, otherwise } => {
253 write!(
254 f,
255 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
256 whens.len(),
257 if otherwise.is_some() { "Some" } else { "None" }
258 )
259 }
260 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
261 BuilderStep::Multicast { steps, .. } => {
262 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
263 }
264 BuilderStep::DeclarativeLog { level, .. } => {
265 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
266 }
267 BuilderStep::Bean { name, method } => {
268 write!(
269 f,
270 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
271 )
272 }
273 BuilderStep::Script { language, .. } => {
274 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
275 }
276 BuilderStep::Throttle { steps, .. } => {
277 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
278 }
279 BuilderStep::LoadBalance { steps, .. } => {
280 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
281 }
282 BuilderStep::DynamicRouter { .. } => {
283 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
284 }
285 BuilderStep::RoutingSlip { .. } => {
286 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
287 }
288 BuilderStep::RecipientList { .. } => {
289 write!(f, "BuilderStep::RecipientList {{ .. }}")
290 }
291 BuilderStep::DeclarativeRecipientList {
292 expression,
293 aggregation,
294 ..
295 } => write!(
296 f,
297 "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
298 expression.language, aggregation
299 ),
300 BuilderStep::Delay { config } => {
301 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
302 }
303 BuilderStep::Loop { config, steps } => {
304 write!(
305 f,
306 "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
307 config.mode_name(),
308 steps.len()
309 )
310 }
311 BuilderStep::DeclarativeLoop {
312 count,
313 while_predicate,
314 steps,
315 } => {
316 write!(
317 f,
318 "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
319 count,
320 while_predicate.is_some(),
321 steps.len()
322 )
323 }
324 }
325 }
326}
327
328pub struct RouteDefinition {
330 pub(crate) from_uri: String,
331 pub(crate) steps: Vec<BuilderStep>,
332 pub(crate) error_handler: Option<ErrorHandlerConfig>,
334 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
336 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
338 pub(crate) concurrency: Option<ConcurrencyModel>,
341 pub(crate) route_id: String,
343 pub(crate) auto_startup: bool,
345 pub(crate) startup_order: i32,
347 pub(crate) source_hash: Option<u64>,
348}
349
350impl RouteDefinition {
351 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
353 Self {
354 from_uri: from_uri.into(),
355 steps,
356 error_handler: None,
357 circuit_breaker: None,
358 unit_of_work: None,
359 concurrency: None,
360 route_id: String::new(), auto_startup: true,
362 startup_order: 1000,
363 source_hash: None,
364 }
365 }
366
367 pub fn from_uri(&self) -> &str {
369 &self.from_uri
370 }
371
372 pub fn steps(&self) -> &[BuilderStep] {
374 &self.steps
375 }
376
377 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
379 self.error_handler = Some(config);
380 self
381 }
382
383 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
385 self.error_handler.as_ref()
386 }
387
388 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
390 self.circuit_breaker = Some(config);
391 self
392 }
393
394 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
396 self.unit_of_work = Some(config);
397 self
398 }
399
400 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
402 self.unit_of_work.as_ref()
403 }
404
405 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
407 self.circuit_breaker.as_ref()
408 }
409
410 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
412 self.concurrency.as_ref()
413 }
414
415 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
417 self.concurrency = Some(model);
418 self
419 }
420
421 pub fn route_id(&self) -> &str {
423 &self.route_id
424 }
425
426 pub fn auto_startup(&self) -> bool {
428 self.auto_startup
429 }
430
431 pub fn startup_order(&self) -> i32 {
433 self.startup_order
434 }
435
436 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
438 self.route_id = id.into();
439 self
440 }
441
442 pub fn with_auto_startup(mut self, auto: bool) -> Self {
444 self.auto_startup = auto;
445 self
446 }
447
448 pub fn with_startup_order(mut self, order: i32) -> Self {
450 self.startup_order = order;
451 self
452 }
453
454 pub fn with_source_hash(mut self, hash: u64) -> Self {
455 self.source_hash = Some(hash);
456 self
457 }
458
459 pub fn source_hash(&self) -> Option<u64> {
460 self.source_hash
461 }
462
463 pub fn to_info(&self) -> RouteDefinitionInfo {
466 RouteDefinitionInfo {
467 route_id: self.route_id.clone(),
468 auto_startup: self.auto_startup,
469 startup_order: self.startup_order,
470 source_hash: self.source_hash,
471 }
472 }
473}
474
475#[derive(Clone)]
481pub struct RouteDefinitionInfo {
482 route_id: String,
483 auto_startup: bool,
484 startup_order: i32,
485 pub(crate) source_hash: Option<u64>,
486}
487
488impl RouteDefinitionInfo {
489 pub fn route_id(&self) -> &str {
491 &self.route_id
492 }
493
494 pub fn auto_startup(&self) -> bool {
496 self.auto_startup
497 }
498
499 pub fn startup_order(&self) -> i32 {
501 self.startup_order
502 }
503
504 pub fn source_hash(&self) -> Option<u64> {
505 self.source_hash
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[test]
514 fn test_builder_step_multicast_variant() {
515 use camel_api::MulticastConfig;
516
517 let step = BuilderStep::Multicast {
518 steps: vec![BuilderStep::To("direct:a".into())],
519 config: MulticastConfig::new(),
520 };
521
522 assert!(matches!(step, BuilderStep::Multicast { .. }));
523 }
524
525 #[test]
526 fn test_route_definition_defaults() {
527 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
528 assert_eq!(def.route_id(), "test-route");
529 assert!(def.auto_startup());
530 assert_eq!(def.startup_order(), 1000);
531 }
532
533 #[test]
534 fn test_route_definition_builders() {
535 let def = RouteDefinition::new("direct:test", vec![])
536 .with_route_id("my-route")
537 .with_auto_startup(false)
538 .with_startup_order(50);
539 assert_eq!(def.route_id(), "my-route");
540 assert!(!def.auto_startup());
541 assert_eq!(def.startup_order(), 50);
542 }
543
544 #[test]
545 fn test_route_definition_accessors_cover_core_fields() {
546 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
547 .with_route_id("accessor-route");
548
549 assert_eq!(def.from_uri(), "direct:in");
550 assert_eq!(def.steps().len(), 1);
551 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
552 }
553
554 #[test]
555 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
556 use camel_api::circuit_breaker::CircuitBreakerConfig;
557 use camel_api::error_handler::ErrorHandlerConfig;
558 use camel_component_api::ConcurrencyModel;
559
560 let def = RouteDefinition::new("direct:test", vec![])
561 .with_route_id("eh-route")
562 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
563 .with_circuit_breaker(CircuitBreakerConfig::new())
564 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
565
566 let eh = def
567 .error_handler_config()
568 .expect("error handler should be set");
569 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
570 assert!(def.circuit_breaker_config().is_some());
571 assert!(matches!(
572 def.concurrency_override(),
573 Some(ConcurrencyModel::Concurrent { max: Some(4) })
574 ));
575 }
576
577 #[test]
578 fn test_builder_step_debug_covers_many_variants() {
579 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
580 use camel_api::{
581 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
582 };
583 use std::sync::Arc;
584
585 let expr = LanguageExpressionDef {
586 language: "simple".into(),
587 source: "${body}".into(),
588 };
589
590 let steps = vec![
591 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
592 BuilderStep::To("mock:out".into()),
593 BuilderStep::Stop,
594 BuilderStep::Log {
595 level: camel_processor::LogLevel::Info,
596 message: "hello".into(),
597 },
598 BuilderStep::DeclarativeSetHeader {
599 key: "k".into(),
600 value: ValueSourceDef::Literal(Value::String("v".into())),
601 },
602 BuilderStep::DeclarativeSetBody {
603 value: ValueSourceDef::Expression(expr.clone()),
604 },
605 BuilderStep::DeclarativeFilter {
606 predicate: expr.clone(),
607 steps: vec![BuilderStep::Stop],
608 },
609 BuilderStep::DeclarativeChoice {
610 whens: vec![DeclarativeWhenStep {
611 predicate: expr.clone(),
612 steps: vec![BuilderStep::Stop],
613 }],
614 otherwise: Some(vec![BuilderStep::Stop]),
615 },
616 BuilderStep::DeclarativeScript {
617 expression: expr.clone(),
618 },
619 BuilderStep::DeclarativeSplit {
620 expression: expr.clone(),
621 aggregation: AggregationStrategy::Original,
622 parallel: false,
623 parallel_limit: Some(2),
624 stop_on_exception: true,
625 steps: vec![BuilderStep::Stop],
626 },
627 BuilderStep::Split {
628 config: SplitterConfig::new(split_body_lines()),
629 steps: vec![BuilderStep::Stop],
630 },
631 BuilderStep::Aggregate {
632 config: camel_api::AggregatorConfig::correlate_by("id")
633 .complete_when_size(1)
634 .build(),
635 },
636 BuilderStep::Filter {
637 predicate: Arc::new(|_: &Exchange| true),
638 steps: vec![BuilderStep::Stop],
639 },
640 BuilderStep::WireTap {
641 uri: "mock:tap".into(),
642 },
643 BuilderStep::DeclarativeLog {
644 level: camel_processor::LogLevel::Info,
645 message: ValueSourceDef::Expression(expr.clone()),
646 },
647 BuilderStep::Bean {
648 name: "bean".into(),
649 method: "call".into(),
650 },
651 BuilderStep::Script {
652 language: "rhai".into(),
653 script: "body".into(),
654 },
655 BuilderStep::Throttle {
656 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
657 steps: vec![BuilderStep::Stop],
658 },
659 BuilderStep::LoadBalance {
660 config: camel_api::LoadBalancerConfig::round_robin(),
661 steps: vec![BuilderStep::To("mock:l1".into())],
662 },
663 BuilderStep::DynamicRouter {
664 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
665 },
666 BuilderStep::RoutingSlip {
667 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
668 },
669 ];
670
671 for step in steps {
672 let dbg = format!("{step:?}");
673 assert!(!dbg.is_empty());
674 }
675 }
676
677 #[test]
678 fn test_route_definition_to_info_preserves_metadata() {
679 let info = RouteDefinition::new("direct:test", vec![])
680 .with_route_id("meta-route")
681 .with_auto_startup(false)
682 .with_startup_order(7)
683 .to_info();
684
685 assert_eq!(info.route_id(), "meta-route");
686 assert!(!info.auto_startup());
687 assert_eq!(info.startup_order(), 7);
688 }
689
690 #[test]
691 fn test_choice_builder_step_debug() {
692 use camel_api::{Exchange, FilterPredicate};
693 use std::sync::Arc;
694
695 fn always_true(_: &Exchange) -> bool {
696 true
697 }
698
699 let step = BuilderStep::Choice {
700 whens: vec![WhenStep {
701 predicate: Arc::new(always_true) as FilterPredicate,
702 steps: vec![BuilderStep::To("mock:a".into())],
703 }],
704 otherwise: None,
705 };
706 let debug = format!("{step:?}");
707 assert!(debug.contains("Choice"));
708 }
709
710 #[test]
711 fn test_route_definition_unit_of_work() {
712 use camel_api::UnitOfWorkConfig;
713 let config = UnitOfWorkConfig {
714 on_complete: Some("log:complete".into()),
715 on_failure: Some("log:failed".into()),
716 };
717 let def = RouteDefinition::new("direct:test", vec![])
718 .with_route_id("uow-test")
719 .with_unit_of_work(config.clone());
720 assert_eq!(
721 def.unit_of_work_config().unwrap().on_complete.as_deref(),
722 Some("log:complete")
723 );
724 assert_eq!(
725 def.unit_of_work_config().unwrap().on_failure.as_deref(),
726 Some("log:failed")
727 );
728
729 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
730 assert!(def_no_uow.unit_of_work_config().is_none());
731 }
732}