1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::loop_eip::LoopConfig;
8use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
9use camel_component_api::ConcurrencyModel;
10
11pub struct WhenStep {
13 pub predicate: FilterPredicate,
14 pub steps: Vec<BuilderStep>,
15}
16
17pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
18
19#[derive(Debug)]
21pub struct DeclarativeWhenStep {
22 pub predicate: LanguageExpressionDef,
23 pub steps: Vec<BuilderStep>,
24}
25
26pub enum BuilderStep {
28 Processor(BoxProcessor),
30 To(String),
32 Stop,
34 Log {
36 level: camel_processor::LogLevel,
37 message: String,
38 },
39 DeclarativeSetHeader {
41 key: String,
42 value: ValueSourceDef,
43 },
44 DeclarativeSetBody {
46 value: ValueSourceDef,
47 },
48 DeclarativeFilter {
50 predicate: LanguageExpressionDef,
51 steps: Vec<BuilderStep>,
52 },
53 DeclarativeChoice {
55 whens: Vec<DeclarativeWhenStep>,
56 otherwise: Option<Vec<BuilderStep>>,
57 },
58 DeclarativeScript {
60 expression: LanguageExpressionDef,
61 },
62 DeclarativeSplit {
64 expression: LanguageExpressionDef,
65 aggregation: camel_api::splitter::AggregationStrategy,
66 parallel: bool,
67 parallel_limit: Option<usize>,
68 stop_on_exception: bool,
69 steps: Vec<BuilderStep>,
70 },
71 DeclarativeDynamicRouter {
72 expression: LanguageExpressionDef,
73 uri_delimiter: String,
74 cache_size: i32,
75 ignore_invalid_endpoints: bool,
76 max_iterations: usize,
77 },
78 DeclarativeRoutingSlip {
79 expression: LanguageExpressionDef,
80 uri_delimiter: String,
81 cache_size: i32,
82 ignore_invalid_endpoints: bool,
83 },
84 Split {
86 config: SplitterConfig,
87 steps: Vec<BuilderStep>,
88 },
89 Aggregate {
91 config: AggregatorConfig,
92 },
93 Filter {
95 predicate: FilterPredicate,
96 steps: Vec<BuilderStep>,
97 },
98 Choice {
101 whens: Vec<WhenStep>,
102 otherwise: Option<Vec<BuilderStep>>,
103 },
104 WireTap {
106 uri: String,
107 },
108 Multicast {
110 steps: Vec<BuilderStep>,
111 config: MulticastConfig,
112 },
113 DeclarativeLog {
115 level: camel_processor::LogLevel,
116 message: ValueSourceDef,
117 },
118 Bean {
120 name: String,
121 method: String,
122 },
123 Script {
126 language: String,
127 script: String,
128 },
129 Throttle {
131 config: camel_api::ThrottlerConfig,
132 steps: Vec<BuilderStep>,
133 },
134 LoadBalance {
136 config: camel_api::LoadBalancerConfig,
137 steps: Vec<BuilderStep>,
138 },
139 DynamicRouter {
141 config: camel_api::DynamicRouterConfig,
142 },
143 RoutingSlip {
144 config: camel_api::RoutingSlipConfig,
145 },
146 RecipientList {
147 config: camel_api::recipient_list::RecipientListConfig,
148 },
149 DeclarativeRecipientList {
150 expression: LanguageExpressionDef,
151 delimiter: String,
152 parallel: bool,
153 parallel_limit: Option<usize>,
154 stop_on_exception: bool,
155 aggregation: String,
156 },
157 Delay {
158 config: camel_api::DelayConfig,
159 },
160 Loop {
162 config: LoopConfig,
163 steps: Vec<BuilderStep>,
164 },
165 DeclarativeLoop {
167 count: Option<usize>,
168 while_predicate: Option<LanguageExpressionDef>,
169 steps: Vec<BuilderStep>,
170 },
171}
172
173impl std::fmt::Debug for BuilderStep {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
177 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
178 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
179 BuilderStep::Log { level, message } => write!(
180 f,
181 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
182 ),
183 BuilderStep::DeclarativeSetHeader { key, .. } => {
184 write!(
185 f,
186 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
187 )
188 }
189 BuilderStep::DeclarativeSetBody { .. } => {
190 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
191 }
192 BuilderStep::DeclarativeFilter { steps, .. } => {
193 write!(
194 f,
195 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
196 )
197 }
198 BuilderStep::DeclarativeChoice { whens, otherwise } => {
199 write!(
200 f,
201 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
202 whens.len(),
203 if otherwise.is_some() { "Some" } else { "None" }
204 )
205 }
206 BuilderStep::DeclarativeScript { expression } => write!(
207 f,
208 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
209 expression.language
210 ),
211 BuilderStep::DeclarativeSplit { steps, .. } => {
212 write!(
213 f,
214 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
215 )
216 }
217 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
218 f,
219 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
220 expression.language
221 ),
222 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
223 f,
224 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
225 expression.language
226 ),
227 BuilderStep::Split { steps, .. } => {
228 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
229 }
230 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
231 BuilderStep::Filter { steps, .. } => {
232 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
233 }
234 BuilderStep::Choice { whens, otherwise } => {
235 write!(
236 f,
237 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
238 whens.len(),
239 if otherwise.is_some() { "Some" } else { "None" }
240 )
241 }
242 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
243 BuilderStep::Multicast { steps, .. } => {
244 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
245 }
246 BuilderStep::DeclarativeLog { level, .. } => {
247 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
248 }
249 BuilderStep::Bean { name, method } => {
250 write!(
251 f,
252 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
253 )
254 }
255 BuilderStep::Script { language, .. } => {
256 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
257 }
258 BuilderStep::Throttle { steps, .. } => {
259 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
260 }
261 BuilderStep::LoadBalance { steps, .. } => {
262 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
263 }
264 BuilderStep::DynamicRouter { .. } => {
265 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
266 }
267 BuilderStep::RoutingSlip { .. } => {
268 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
269 }
270 BuilderStep::RecipientList { .. } => {
271 write!(f, "BuilderStep::RecipientList {{ .. }}")
272 }
273 BuilderStep::DeclarativeRecipientList {
274 expression,
275 aggregation,
276 ..
277 } => write!(
278 f,
279 "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
280 expression.language, aggregation
281 ),
282 BuilderStep::Delay { config } => {
283 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
284 }
285 BuilderStep::Loop { config, steps } => {
286 write!(f, "BuilderStep::Loop {{ config: {:?}, steps: {} }}", config.mode_name(), steps.len())
287 }
288 BuilderStep::DeclarativeLoop {
289 count,
290 while_predicate,
291 steps,
292 } => {
293 write!(
294 f,
295 "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
296 count,
297 while_predicate.is_some(),
298 steps.len()
299 )
300 }
301 }
302 }
303}
304
305pub struct RouteDefinition {
307 pub(crate) from_uri: String,
308 pub(crate) steps: Vec<BuilderStep>,
309 pub(crate) error_handler: Option<ErrorHandlerConfig>,
311 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
313 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
315 pub(crate) concurrency: Option<ConcurrencyModel>,
318 pub(crate) route_id: String,
320 pub(crate) auto_startup: bool,
322 pub(crate) startup_order: i32,
324 pub(crate) source_hash: Option<u64>,
325}
326
327impl RouteDefinition {
328 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
330 Self {
331 from_uri: from_uri.into(),
332 steps,
333 error_handler: None,
334 circuit_breaker: None,
335 unit_of_work: None,
336 concurrency: None,
337 route_id: String::new(), auto_startup: true,
339 startup_order: 1000,
340 source_hash: None,
341 }
342 }
343
344 pub fn from_uri(&self) -> &str {
346 &self.from_uri
347 }
348
349 pub fn steps(&self) -> &[BuilderStep] {
351 &self.steps
352 }
353
354 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
356 self.error_handler = Some(config);
357 self
358 }
359
360 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
362 self.error_handler.as_ref()
363 }
364
365 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
367 self.circuit_breaker = Some(config);
368 self
369 }
370
371 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
373 self.unit_of_work = Some(config);
374 self
375 }
376
377 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
379 self.unit_of_work.as_ref()
380 }
381
382 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
384 self.circuit_breaker.as_ref()
385 }
386
387 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
389 self.concurrency.as_ref()
390 }
391
392 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
394 self.concurrency = Some(model);
395 self
396 }
397
398 pub fn route_id(&self) -> &str {
400 &self.route_id
401 }
402
403 pub fn auto_startup(&self) -> bool {
405 self.auto_startup
406 }
407
408 pub fn startup_order(&self) -> i32 {
410 self.startup_order
411 }
412
413 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
415 self.route_id = id.into();
416 self
417 }
418
419 pub fn with_auto_startup(mut self, auto: bool) -> Self {
421 self.auto_startup = auto;
422 self
423 }
424
425 pub fn with_startup_order(mut self, order: i32) -> Self {
427 self.startup_order = order;
428 self
429 }
430
431 pub fn with_source_hash(mut self, hash: u64) -> Self {
432 self.source_hash = Some(hash);
433 self
434 }
435
436 pub fn source_hash(&self) -> Option<u64> {
437 self.source_hash
438 }
439
440 pub fn to_info(&self) -> RouteDefinitionInfo {
443 RouteDefinitionInfo {
444 route_id: self.route_id.clone(),
445 auto_startup: self.auto_startup,
446 startup_order: self.startup_order,
447 source_hash: self.source_hash,
448 }
449 }
450}
451
452#[derive(Clone)]
458pub struct RouteDefinitionInfo {
459 route_id: String,
460 auto_startup: bool,
461 startup_order: i32,
462 pub(crate) source_hash: Option<u64>,
463}
464
465impl RouteDefinitionInfo {
466 pub fn route_id(&self) -> &str {
468 &self.route_id
469 }
470
471 pub fn auto_startup(&self) -> bool {
473 self.auto_startup
474 }
475
476 pub fn startup_order(&self) -> i32 {
478 self.startup_order
479 }
480
481 pub fn source_hash(&self) -> Option<u64> {
482 self.source_hash
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[test]
491 fn test_builder_step_multicast_variant() {
492 use camel_api::MulticastConfig;
493
494 let step = BuilderStep::Multicast {
495 steps: vec![BuilderStep::To("direct:a".into())],
496 config: MulticastConfig::new(),
497 };
498
499 assert!(matches!(step, BuilderStep::Multicast { .. }));
500 }
501
502 #[test]
503 fn test_route_definition_defaults() {
504 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
505 assert_eq!(def.route_id(), "test-route");
506 assert!(def.auto_startup());
507 assert_eq!(def.startup_order(), 1000);
508 }
509
510 #[test]
511 fn test_route_definition_builders() {
512 let def = RouteDefinition::new("direct:test", vec![])
513 .with_route_id("my-route")
514 .with_auto_startup(false)
515 .with_startup_order(50);
516 assert_eq!(def.route_id(), "my-route");
517 assert!(!def.auto_startup());
518 assert_eq!(def.startup_order(), 50);
519 }
520
521 #[test]
522 fn test_route_definition_accessors_cover_core_fields() {
523 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
524 .with_route_id("accessor-route");
525
526 assert_eq!(def.from_uri(), "direct:in");
527 assert_eq!(def.steps().len(), 1);
528 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
529 }
530
531 #[test]
532 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
533 use camel_api::circuit_breaker::CircuitBreakerConfig;
534 use camel_api::error_handler::ErrorHandlerConfig;
535 use camel_component_api::ConcurrencyModel;
536
537 let def = RouteDefinition::new("direct:test", vec![])
538 .with_route_id("eh-route")
539 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
540 .with_circuit_breaker(CircuitBreakerConfig::new())
541 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
542
543 let eh = def
544 .error_handler_config()
545 .expect("error handler should be set");
546 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
547 assert!(def.circuit_breaker_config().is_some());
548 assert!(matches!(
549 def.concurrency_override(),
550 Some(ConcurrencyModel::Concurrent { max: Some(4) })
551 ));
552 }
553
554 #[test]
555 fn test_builder_step_debug_covers_many_variants() {
556 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
557 use camel_api::{
558 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
559 };
560 use std::sync::Arc;
561
562 let expr = LanguageExpressionDef {
563 language: "simple".into(),
564 source: "${body}".into(),
565 };
566
567 let steps = vec![
568 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
569 BuilderStep::To("mock:out".into()),
570 BuilderStep::Stop,
571 BuilderStep::Log {
572 level: camel_processor::LogLevel::Info,
573 message: "hello".into(),
574 },
575 BuilderStep::DeclarativeSetHeader {
576 key: "k".into(),
577 value: ValueSourceDef::Literal(Value::String("v".into())),
578 },
579 BuilderStep::DeclarativeSetBody {
580 value: ValueSourceDef::Expression(expr.clone()),
581 },
582 BuilderStep::DeclarativeFilter {
583 predicate: expr.clone(),
584 steps: vec![BuilderStep::Stop],
585 },
586 BuilderStep::DeclarativeChoice {
587 whens: vec![DeclarativeWhenStep {
588 predicate: expr.clone(),
589 steps: vec![BuilderStep::Stop],
590 }],
591 otherwise: Some(vec![BuilderStep::Stop]),
592 },
593 BuilderStep::DeclarativeScript {
594 expression: expr.clone(),
595 },
596 BuilderStep::DeclarativeSplit {
597 expression: expr.clone(),
598 aggregation: AggregationStrategy::Original,
599 parallel: false,
600 parallel_limit: Some(2),
601 stop_on_exception: true,
602 steps: vec![BuilderStep::Stop],
603 },
604 BuilderStep::Split {
605 config: SplitterConfig::new(split_body_lines()),
606 steps: vec![BuilderStep::Stop],
607 },
608 BuilderStep::Aggregate {
609 config: camel_api::AggregatorConfig::correlate_by("id")
610 .complete_when_size(1)
611 .build(),
612 },
613 BuilderStep::Filter {
614 predicate: Arc::new(|_: &Exchange| true),
615 steps: vec![BuilderStep::Stop],
616 },
617 BuilderStep::WireTap {
618 uri: "mock:tap".into(),
619 },
620 BuilderStep::DeclarativeLog {
621 level: camel_processor::LogLevel::Info,
622 message: ValueSourceDef::Expression(expr.clone()),
623 },
624 BuilderStep::Bean {
625 name: "bean".into(),
626 method: "call".into(),
627 },
628 BuilderStep::Script {
629 language: "rhai".into(),
630 script: "body".into(),
631 },
632 BuilderStep::Throttle {
633 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
634 steps: vec![BuilderStep::Stop],
635 },
636 BuilderStep::LoadBalance {
637 config: camel_api::LoadBalancerConfig::round_robin(),
638 steps: vec![BuilderStep::To("mock:l1".into())],
639 },
640 BuilderStep::DynamicRouter {
641 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
642 },
643 BuilderStep::RoutingSlip {
644 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
645 },
646 ];
647
648 for step in steps {
649 let dbg = format!("{step:?}");
650 assert!(!dbg.is_empty());
651 }
652 }
653
654 #[test]
655 fn test_route_definition_to_info_preserves_metadata() {
656 let info = RouteDefinition::new("direct:test", vec![])
657 .with_route_id("meta-route")
658 .with_auto_startup(false)
659 .with_startup_order(7)
660 .to_info();
661
662 assert_eq!(info.route_id(), "meta-route");
663 assert!(!info.auto_startup());
664 assert_eq!(info.startup_order(), 7);
665 }
666
667 #[test]
668 fn test_choice_builder_step_debug() {
669 use camel_api::{Exchange, FilterPredicate};
670 use std::sync::Arc;
671
672 fn always_true(_: &Exchange) -> bool {
673 true
674 }
675
676 let step = BuilderStep::Choice {
677 whens: vec![WhenStep {
678 predicate: Arc::new(always_true) as FilterPredicate,
679 steps: vec![BuilderStep::To("mock:a".into())],
680 }],
681 otherwise: None,
682 };
683 let debug = format!("{step:?}");
684 assert!(debug.contains("Choice"));
685 }
686
687 #[test]
688 fn test_route_definition_unit_of_work() {
689 use camel_api::UnitOfWorkConfig;
690 let config = UnitOfWorkConfig {
691 on_complete: Some("log:complete".into()),
692 on_failure: Some("log:failed".into()),
693 };
694 let def = RouteDefinition::new("direct:test", vec![])
695 .with_route_id("uow-test")
696 .with_unit_of_work(config.clone());
697 assert_eq!(
698 def.unit_of_work_config().unwrap().on_complete.as_deref(),
699 Some("log:complete")
700 );
701 assert_eq!(
702 def.unit_of_work_config().unwrap().on_failure.as_deref(),
703 Some("log:failed")
704 );
705
706 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
707 assert!(def_no_uow.unit_of_work_config().is_none());
708 }
709}