1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::loop_eip::LoopConfig;
8use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
9use camel_component_api::ConcurrencyModel;
10
11pub struct WhenStep {
13 pub predicate: FilterPredicate,
14 pub steps: Vec<BuilderStep>,
15}
16
17pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
18
19#[derive(Debug)]
21pub struct DeclarativeWhenStep {
22 pub predicate: LanguageExpressionDef,
23 pub steps: Vec<BuilderStep>,
24}
25
26pub enum BuilderStep {
28 Processor(BoxProcessor),
30 To(String),
32 Stop,
34 Log {
36 level: camel_processor::LogLevel,
37 message: String,
38 },
39 DeclarativeSetHeader {
41 key: String,
42 value: ValueSourceDef,
43 },
44 DeclarativeSetBody {
46 value: ValueSourceDef,
47 },
48 DeclarativeFilter {
50 predicate: LanguageExpressionDef,
51 steps: Vec<BuilderStep>,
52 },
53 DeclarativeChoice {
55 whens: Vec<DeclarativeWhenStep>,
56 otherwise: Option<Vec<BuilderStep>>,
57 },
58 DeclarativeScript {
60 expression: LanguageExpressionDef,
61 },
62 DeclarativeSplit {
64 expression: LanguageExpressionDef,
65 aggregation: camel_api::splitter::AggregationStrategy,
66 parallel: bool,
67 parallel_limit: Option<usize>,
68 stop_on_exception: bool,
69 steps: Vec<BuilderStep>,
70 },
71 DeclarativeDynamicRouter {
72 expression: LanguageExpressionDef,
73 uri_delimiter: String,
74 cache_size: i32,
75 ignore_invalid_endpoints: bool,
76 max_iterations: usize,
77 },
78 DeclarativeRoutingSlip {
79 expression: LanguageExpressionDef,
80 uri_delimiter: String,
81 cache_size: i32,
82 ignore_invalid_endpoints: bool,
83 },
84 Split {
86 config: SplitterConfig,
87 steps: Vec<BuilderStep>,
88 },
89 Aggregate {
91 config: AggregatorConfig,
92 },
93 Filter {
95 predicate: FilterPredicate,
96 steps: Vec<BuilderStep>,
97 },
98 Choice {
101 whens: Vec<WhenStep>,
102 otherwise: Option<Vec<BuilderStep>>,
103 },
104 WireTap {
106 uri: String,
107 },
108 Multicast {
110 steps: Vec<BuilderStep>,
111 config: MulticastConfig,
112 },
113 DeclarativeLog {
115 level: camel_processor::LogLevel,
116 message: ValueSourceDef,
117 },
118 Bean {
120 name: String,
121 method: String,
122 },
123 Script {
126 language: String,
127 script: String,
128 },
129 Throttle {
131 config: camel_api::ThrottlerConfig,
132 steps: Vec<BuilderStep>,
133 },
134 LoadBalance {
136 config: camel_api::LoadBalancerConfig,
137 steps: Vec<BuilderStep>,
138 },
139 DynamicRouter {
141 config: camel_api::DynamicRouterConfig,
142 },
143 RoutingSlip {
144 config: camel_api::RoutingSlipConfig,
145 },
146 RecipientList {
147 config: camel_api::recipient_list::RecipientListConfig,
148 },
149 DeclarativeRecipientList {
150 expression: LanguageExpressionDef,
151 delimiter: String,
152 parallel: bool,
153 parallel_limit: Option<usize>,
154 stop_on_exception: bool,
155 aggregation: String,
156 },
157 Delay {
158 config: camel_api::DelayConfig,
159 },
160 Loop {
162 config: LoopConfig,
163 steps: Vec<BuilderStep>,
164 },
165 DeclarativeLoop {
167 count: Option<usize>,
168 while_predicate: Option<LanguageExpressionDef>,
169 steps: Vec<BuilderStep>,
170 },
171}
172
173impl std::fmt::Debug for BuilderStep {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
177 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
178 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
179 BuilderStep::Log { level, message } => write!(
180 f,
181 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
182 ),
183 BuilderStep::DeclarativeSetHeader { key, .. } => {
184 write!(
185 f,
186 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
187 )
188 }
189 BuilderStep::DeclarativeSetBody { .. } => {
190 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
191 }
192 BuilderStep::DeclarativeFilter { steps, .. } => {
193 write!(
194 f,
195 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
196 )
197 }
198 BuilderStep::DeclarativeChoice { whens, otherwise } => {
199 write!(
200 f,
201 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
202 whens.len(),
203 if otherwise.is_some() { "Some" } else { "None" }
204 )
205 }
206 BuilderStep::DeclarativeScript { expression } => write!(
207 f,
208 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
209 expression.language
210 ),
211 BuilderStep::DeclarativeSplit { steps, .. } => {
212 write!(
213 f,
214 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
215 )
216 }
217 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
218 f,
219 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
220 expression.language
221 ),
222 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
223 f,
224 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
225 expression.language
226 ),
227 BuilderStep::Split { steps, .. } => {
228 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
229 }
230 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
231 BuilderStep::Filter { steps, .. } => {
232 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
233 }
234 BuilderStep::Choice { whens, otherwise } => {
235 write!(
236 f,
237 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
238 whens.len(),
239 if otherwise.is_some() { "Some" } else { "None" }
240 )
241 }
242 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
243 BuilderStep::Multicast { steps, .. } => {
244 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
245 }
246 BuilderStep::DeclarativeLog { level, .. } => {
247 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
248 }
249 BuilderStep::Bean { name, method } => {
250 write!(
251 f,
252 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
253 )
254 }
255 BuilderStep::Script { language, .. } => {
256 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
257 }
258 BuilderStep::Throttle { steps, .. } => {
259 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
260 }
261 BuilderStep::LoadBalance { steps, .. } => {
262 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
263 }
264 BuilderStep::DynamicRouter { .. } => {
265 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
266 }
267 BuilderStep::RoutingSlip { .. } => {
268 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
269 }
270 BuilderStep::RecipientList { .. } => {
271 write!(f, "BuilderStep::RecipientList {{ .. }}")
272 }
273 BuilderStep::DeclarativeRecipientList {
274 expression,
275 aggregation,
276 ..
277 } => write!(
278 f,
279 "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
280 expression.language, aggregation
281 ),
282 BuilderStep::Delay { config } => {
283 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
284 }
285 BuilderStep::Loop { config, steps } => {
286 write!(
287 f,
288 "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
289 config.mode_name(),
290 steps.len()
291 )
292 }
293 BuilderStep::DeclarativeLoop {
294 count,
295 while_predicate,
296 steps,
297 } => {
298 write!(
299 f,
300 "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
301 count,
302 while_predicate.is_some(),
303 steps.len()
304 )
305 }
306 }
307 }
308}
309
310pub struct RouteDefinition {
312 pub(crate) from_uri: String,
313 pub(crate) steps: Vec<BuilderStep>,
314 pub(crate) error_handler: Option<ErrorHandlerConfig>,
316 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
318 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
320 pub(crate) concurrency: Option<ConcurrencyModel>,
323 pub(crate) route_id: String,
325 pub(crate) auto_startup: bool,
327 pub(crate) startup_order: i32,
329 pub(crate) source_hash: Option<u64>,
330}
331
332impl RouteDefinition {
333 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
335 Self {
336 from_uri: from_uri.into(),
337 steps,
338 error_handler: None,
339 circuit_breaker: None,
340 unit_of_work: None,
341 concurrency: None,
342 route_id: String::new(), auto_startup: true,
344 startup_order: 1000,
345 source_hash: None,
346 }
347 }
348
349 pub fn from_uri(&self) -> &str {
351 &self.from_uri
352 }
353
354 pub fn steps(&self) -> &[BuilderStep] {
356 &self.steps
357 }
358
359 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
361 self.error_handler = Some(config);
362 self
363 }
364
365 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
367 self.error_handler.as_ref()
368 }
369
370 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
372 self.circuit_breaker = Some(config);
373 self
374 }
375
376 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
378 self.unit_of_work = Some(config);
379 self
380 }
381
382 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
384 self.unit_of_work.as_ref()
385 }
386
387 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
389 self.circuit_breaker.as_ref()
390 }
391
392 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
394 self.concurrency.as_ref()
395 }
396
397 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
399 self.concurrency = Some(model);
400 self
401 }
402
403 pub fn route_id(&self) -> &str {
405 &self.route_id
406 }
407
408 pub fn auto_startup(&self) -> bool {
410 self.auto_startup
411 }
412
413 pub fn startup_order(&self) -> i32 {
415 self.startup_order
416 }
417
418 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
420 self.route_id = id.into();
421 self
422 }
423
424 pub fn with_auto_startup(mut self, auto: bool) -> Self {
426 self.auto_startup = auto;
427 self
428 }
429
430 pub fn with_startup_order(mut self, order: i32) -> Self {
432 self.startup_order = order;
433 self
434 }
435
436 pub fn with_source_hash(mut self, hash: u64) -> Self {
437 self.source_hash = Some(hash);
438 self
439 }
440
441 pub fn source_hash(&self) -> Option<u64> {
442 self.source_hash
443 }
444
445 pub fn to_info(&self) -> RouteDefinitionInfo {
448 RouteDefinitionInfo {
449 route_id: self.route_id.clone(),
450 auto_startup: self.auto_startup,
451 startup_order: self.startup_order,
452 source_hash: self.source_hash,
453 }
454 }
455}
456
457#[derive(Clone)]
463pub struct RouteDefinitionInfo {
464 route_id: String,
465 auto_startup: bool,
466 startup_order: i32,
467 pub(crate) source_hash: Option<u64>,
468}
469
470impl RouteDefinitionInfo {
471 pub fn route_id(&self) -> &str {
473 &self.route_id
474 }
475
476 pub fn auto_startup(&self) -> bool {
478 self.auto_startup
479 }
480
481 pub fn startup_order(&self) -> i32 {
483 self.startup_order
484 }
485
486 pub fn source_hash(&self) -> Option<u64> {
487 self.source_hash
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[test]
496 fn test_builder_step_multicast_variant() {
497 use camel_api::MulticastConfig;
498
499 let step = BuilderStep::Multicast {
500 steps: vec![BuilderStep::To("direct:a".into())],
501 config: MulticastConfig::new(),
502 };
503
504 assert!(matches!(step, BuilderStep::Multicast { .. }));
505 }
506
507 #[test]
508 fn test_route_definition_defaults() {
509 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
510 assert_eq!(def.route_id(), "test-route");
511 assert!(def.auto_startup());
512 assert_eq!(def.startup_order(), 1000);
513 }
514
515 #[test]
516 fn test_route_definition_builders() {
517 let def = RouteDefinition::new("direct:test", vec![])
518 .with_route_id("my-route")
519 .with_auto_startup(false)
520 .with_startup_order(50);
521 assert_eq!(def.route_id(), "my-route");
522 assert!(!def.auto_startup());
523 assert_eq!(def.startup_order(), 50);
524 }
525
526 #[test]
527 fn test_route_definition_accessors_cover_core_fields() {
528 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
529 .with_route_id("accessor-route");
530
531 assert_eq!(def.from_uri(), "direct:in");
532 assert_eq!(def.steps().len(), 1);
533 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
534 }
535
536 #[test]
537 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
538 use camel_api::circuit_breaker::CircuitBreakerConfig;
539 use camel_api::error_handler::ErrorHandlerConfig;
540 use camel_component_api::ConcurrencyModel;
541
542 let def = RouteDefinition::new("direct:test", vec![])
543 .with_route_id("eh-route")
544 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
545 .with_circuit_breaker(CircuitBreakerConfig::new())
546 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
547
548 let eh = def
549 .error_handler_config()
550 .expect("error handler should be set");
551 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
552 assert!(def.circuit_breaker_config().is_some());
553 assert!(matches!(
554 def.concurrency_override(),
555 Some(ConcurrencyModel::Concurrent { max: Some(4) })
556 ));
557 }
558
559 #[test]
560 fn test_builder_step_debug_covers_many_variants() {
561 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
562 use camel_api::{
563 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
564 };
565 use std::sync::Arc;
566
567 let expr = LanguageExpressionDef {
568 language: "simple".into(),
569 source: "${body}".into(),
570 };
571
572 let steps = vec![
573 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
574 BuilderStep::To("mock:out".into()),
575 BuilderStep::Stop,
576 BuilderStep::Log {
577 level: camel_processor::LogLevel::Info,
578 message: "hello".into(),
579 },
580 BuilderStep::DeclarativeSetHeader {
581 key: "k".into(),
582 value: ValueSourceDef::Literal(Value::String("v".into())),
583 },
584 BuilderStep::DeclarativeSetBody {
585 value: ValueSourceDef::Expression(expr.clone()),
586 },
587 BuilderStep::DeclarativeFilter {
588 predicate: expr.clone(),
589 steps: vec![BuilderStep::Stop],
590 },
591 BuilderStep::DeclarativeChoice {
592 whens: vec![DeclarativeWhenStep {
593 predicate: expr.clone(),
594 steps: vec![BuilderStep::Stop],
595 }],
596 otherwise: Some(vec![BuilderStep::Stop]),
597 },
598 BuilderStep::DeclarativeScript {
599 expression: expr.clone(),
600 },
601 BuilderStep::DeclarativeSplit {
602 expression: expr.clone(),
603 aggregation: AggregationStrategy::Original,
604 parallel: false,
605 parallel_limit: Some(2),
606 stop_on_exception: true,
607 steps: vec![BuilderStep::Stop],
608 },
609 BuilderStep::Split {
610 config: SplitterConfig::new(split_body_lines()),
611 steps: vec![BuilderStep::Stop],
612 },
613 BuilderStep::Aggregate {
614 config: camel_api::AggregatorConfig::correlate_by("id")
615 .complete_when_size(1)
616 .build(),
617 },
618 BuilderStep::Filter {
619 predicate: Arc::new(|_: &Exchange| true),
620 steps: vec![BuilderStep::Stop],
621 },
622 BuilderStep::WireTap {
623 uri: "mock:tap".into(),
624 },
625 BuilderStep::DeclarativeLog {
626 level: camel_processor::LogLevel::Info,
627 message: ValueSourceDef::Expression(expr.clone()),
628 },
629 BuilderStep::Bean {
630 name: "bean".into(),
631 method: "call".into(),
632 },
633 BuilderStep::Script {
634 language: "rhai".into(),
635 script: "body".into(),
636 },
637 BuilderStep::Throttle {
638 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
639 steps: vec![BuilderStep::Stop],
640 },
641 BuilderStep::LoadBalance {
642 config: camel_api::LoadBalancerConfig::round_robin(),
643 steps: vec![BuilderStep::To("mock:l1".into())],
644 },
645 BuilderStep::DynamicRouter {
646 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
647 },
648 BuilderStep::RoutingSlip {
649 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
650 },
651 ];
652
653 for step in steps {
654 let dbg = format!("{step:?}");
655 assert!(!dbg.is_empty());
656 }
657 }
658
659 #[test]
660 fn test_route_definition_to_info_preserves_metadata() {
661 let info = RouteDefinition::new("direct:test", vec![])
662 .with_route_id("meta-route")
663 .with_auto_startup(false)
664 .with_startup_order(7)
665 .to_info();
666
667 assert_eq!(info.route_id(), "meta-route");
668 assert!(!info.auto_startup());
669 assert_eq!(info.startup_order(), 7);
670 }
671
672 #[test]
673 fn test_choice_builder_step_debug() {
674 use camel_api::{Exchange, FilterPredicate};
675 use std::sync::Arc;
676
677 fn always_true(_: &Exchange) -> bool {
678 true
679 }
680
681 let step = BuilderStep::Choice {
682 whens: vec![WhenStep {
683 predicate: Arc::new(always_true) as FilterPredicate,
684 steps: vec![BuilderStep::To("mock:a".into())],
685 }],
686 otherwise: None,
687 };
688 let debug = format!("{step:?}");
689 assert!(debug.contains("Choice"));
690 }
691
692 #[test]
693 fn test_route_definition_unit_of_work() {
694 use camel_api::UnitOfWorkConfig;
695 let config = UnitOfWorkConfig {
696 on_complete: Some("log:complete".into()),
697 on_failure: Some("log:failed".into()),
698 };
699 let def = RouteDefinition::new("direct:test", vec![])
700 .with_route_id("uow-test")
701 .with_unit_of_work(config.clone());
702 assert_eq!(
703 def.unit_of_work_config().unwrap().on_complete.as_deref(),
704 Some("log:complete")
705 );
706 assert_eq!(
707 def.unit_of_work_config().unwrap().on_failure.as_deref(),
708 Some("log:failed")
709 );
710
711 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
712 assert!(def_no_uow.unit_of_work_config().is_none());
713 }
714}