1use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::error_handler::ErrorHandlerConfig;
6use camel_api::UnitOfWorkConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component_api::ConcurrencyModel;
9
10pub struct WhenStep {
12 pub predicate: FilterPredicate,
13 pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21 pub predicate: LanguageExpressionDef,
22 pub steps: Vec<BuilderStep>,
23}
24
25pub enum BuilderStep {
27 Processor(BoxProcessor),
29 To(String),
31 Stop,
33 Log {
35 level: camel_processor::LogLevel,
36 message: String,
37 },
38 DeclarativeSetHeader {
40 key: String,
41 value: ValueSourceDef,
42 },
43 DeclarativeSetBody {
45 value: ValueSourceDef,
46 },
47 DeclarativeFilter {
49 predicate: LanguageExpressionDef,
50 steps: Vec<BuilderStep>,
51 },
52 DeclarativeChoice {
54 whens: Vec<DeclarativeWhenStep>,
55 otherwise: Option<Vec<BuilderStep>>,
56 },
57 DeclarativeScript {
59 expression: LanguageExpressionDef,
60 },
61 DeclarativeSplit {
63 expression: LanguageExpressionDef,
64 aggregation: camel_api::splitter::AggregationStrategy,
65 parallel: bool,
66 parallel_limit: Option<usize>,
67 stop_on_exception: bool,
68 steps: Vec<BuilderStep>,
69 },
70 DeclarativeDynamicRouter {
71 expression: LanguageExpressionDef,
72 uri_delimiter: String,
73 cache_size: i32,
74 ignore_invalid_endpoints: bool,
75 max_iterations: usize,
76 },
77 DeclarativeRoutingSlip {
78 expression: LanguageExpressionDef,
79 uri_delimiter: String,
80 cache_size: i32,
81 ignore_invalid_endpoints: bool,
82 },
83 Split {
85 config: SplitterConfig,
86 steps: Vec<BuilderStep>,
87 },
88 Aggregate {
90 config: AggregatorConfig,
91 },
92 Filter {
94 predicate: FilterPredicate,
95 steps: Vec<BuilderStep>,
96 },
97 Choice {
100 whens: Vec<WhenStep>,
101 otherwise: Option<Vec<BuilderStep>>,
102 },
103 WireTap {
105 uri: String,
106 },
107 Multicast {
109 steps: Vec<BuilderStep>,
110 config: MulticastConfig,
111 },
112 DeclarativeLog {
114 level: camel_processor::LogLevel,
115 message: ValueSourceDef,
116 },
117 Bean {
119 name: String,
120 method: String,
121 },
122 Script {
125 language: String,
126 script: String,
127 },
128 Throttle {
130 config: camel_api::ThrottlerConfig,
131 steps: Vec<BuilderStep>,
132 },
133 LoadBalance {
135 config: camel_api::LoadBalancerConfig,
136 steps: Vec<BuilderStep>,
137 },
138 DynamicRouter {
140 config: camel_api::DynamicRouterConfig,
141 },
142 RoutingSlip {
143 config: camel_api::RoutingSlipConfig,
144 },
145 Delay {
146 config: camel_api::DelayConfig,
147 },
148}
149
150impl std::fmt::Debug for BuilderStep {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 match self {
153 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
154 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
155 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
156 BuilderStep::Log { level, message } => write!(
157 f,
158 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
159 ),
160 BuilderStep::DeclarativeSetHeader { key, .. } => {
161 write!(
162 f,
163 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
164 )
165 }
166 BuilderStep::DeclarativeSetBody { .. } => {
167 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
168 }
169 BuilderStep::DeclarativeFilter { steps, .. } => {
170 write!(
171 f,
172 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
173 )
174 }
175 BuilderStep::DeclarativeChoice { whens, otherwise } => {
176 write!(
177 f,
178 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
179 whens.len(),
180 if otherwise.is_some() { "Some" } else { "None" }
181 )
182 }
183 BuilderStep::DeclarativeScript { expression } => write!(
184 f,
185 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
186 expression.language
187 ),
188 BuilderStep::DeclarativeSplit { steps, .. } => {
189 write!(
190 f,
191 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
192 )
193 }
194 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
195 f,
196 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
197 expression.language
198 ),
199 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
200 f,
201 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
202 expression.language
203 ),
204 BuilderStep::Split { steps, .. } => {
205 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
206 }
207 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
208 BuilderStep::Filter { steps, .. } => {
209 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
210 }
211 BuilderStep::Choice { whens, otherwise } => {
212 write!(
213 f,
214 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
215 whens.len(),
216 if otherwise.is_some() { "Some" } else { "None" }
217 )
218 }
219 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
220 BuilderStep::Multicast { steps, .. } => {
221 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
222 }
223 BuilderStep::DeclarativeLog { level, .. } => {
224 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
225 }
226 BuilderStep::Bean { name, method } => {
227 write!(
228 f,
229 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
230 )
231 }
232 BuilderStep::Script { language, .. } => {
233 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
234 }
235 BuilderStep::Throttle { steps, .. } => {
236 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
237 }
238 BuilderStep::LoadBalance { steps, .. } => {
239 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
240 }
241 BuilderStep::DynamicRouter { .. } => {
242 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
243 }
244 BuilderStep::RoutingSlip { .. } => {
245 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
246 }
247 BuilderStep::Delay { config } => {
248 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
249 }
250 }
251 }
252}
253
254pub struct RouteDefinition {
256 pub(crate) from_uri: String,
257 pub(crate) steps: Vec<BuilderStep>,
258 pub(crate) error_handler: Option<ErrorHandlerConfig>,
260 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
262 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
264 pub(crate) concurrency: Option<ConcurrencyModel>,
267 pub(crate) route_id: String,
269 pub(crate) auto_startup: bool,
271 pub(crate) startup_order: i32,
273}
274
275impl RouteDefinition {
276 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
278 Self {
279 from_uri: from_uri.into(),
280 steps,
281 error_handler: None,
282 circuit_breaker: None,
283 unit_of_work: None,
284 concurrency: None,
285 route_id: String::new(), auto_startup: true,
287 startup_order: 1000,
288 }
289 }
290
291 pub fn from_uri(&self) -> &str {
293 &self.from_uri
294 }
295
296 pub fn steps(&self) -> &[BuilderStep] {
298 &self.steps
299 }
300
301 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
303 self.error_handler = Some(config);
304 self
305 }
306
307 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
309 self.error_handler.as_ref()
310 }
311
312 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
314 self.circuit_breaker = Some(config);
315 self
316 }
317
318 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
320 self.unit_of_work = Some(config);
321 self
322 }
323
324 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
326 self.unit_of_work.as_ref()
327 }
328
329 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
331 self.circuit_breaker.as_ref()
332 }
333
334 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
336 self.concurrency.as_ref()
337 }
338
339 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
341 self.concurrency = Some(model);
342 self
343 }
344
345 pub fn route_id(&self) -> &str {
347 &self.route_id
348 }
349
350 pub fn auto_startup(&self) -> bool {
352 self.auto_startup
353 }
354
355 pub fn startup_order(&self) -> i32 {
357 self.startup_order
358 }
359
360 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
362 self.route_id = id.into();
363 self
364 }
365
366 pub fn with_auto_startup(mut self, auto: bool) -> Self {
368 self.auto_startup = auto;
369 self
370 }
371
372 pub fn with_startup_order(mut self, order: i32) -> Self {
374 self.startup_order = order;
375 self
376 }
377
378 pub fn to_info(&self) -> RouteDefinitionInfo {
381 RouteDefinitionInfo {
382 route_id: self.route_id.clone(),
383 auto_startup: self.auto_startup,
384 startup_order: self.startup_order,
385 }
386 }
387}
388
389#[derive(Clone)]
395pub struct RouteDefinitionInfo {
396 route_id: String,
397 auto_startup: bool,
398 startup_order: i32,
399}
400
401impl RouteDefinitionInfo {
402 pub fn route_id(&self) -> &str {
404 &self.route_id
405 }
406
407 pub fn auto_startup(&self) -> bool {
409 self.auto_startup
410 }
411
412 pub fn startup_order(&self) -> i32 {
414 self.startup_order
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_builder_step_multicast_variant() {
424 use camel_api::MulticastConfig;
425
426 let step = BuilderStep::Multicast {
427 steps: vec![BuilderStep::To("direct:a".into())],
428 config: MulticastConfig::new(),
429 };
430
431 assert!(matches!(step, BuilderStep::Multicast { .. }));
432 }
433
434 #[test]
435 fn test_route_definition_defaults() {
436 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
437 assert_eq!(def.route_id(), "test-route");
438 assert!(def.auto_startup());
439 assert_eq!(def.startup_order(), 1000);
440 }
441
442 #[test]
443 fn test_route_definition_builders() {
444 let def = RouteDefinition::new("direct:test", vec![])
445 .with_route_id("my-route")
446 .with_auto_startup(false)
447 .with_startup_order(50);
448 assert_eq!(def.route_id(), "my-route");
449 assert!(!def.auto_startup());
450 assert_eq!(def.startup_order(), 50);
451 }
452
453 #[test]
454 fn test_route_definition_accessors_cover_core_fields() {
455 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
456 .with_route_id("accessor-route");
457
458 assert_eq!(def.from_uri(), "direct:in");
459 assert_eq!(def.steps().len(), 1);
460 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
461 }
462
463 #[test]
464 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
465 use camel_api::circuit_breaker::CircuitBreakerConfig;
466 use camel_api::error_handler::ErrorHandlerConfig;
467 use camel_component_api::ConcurrencyModel;
468
469 let def = RouteDefinition::new("direct:test", vec![])
470 .with_route_id("eh-route")
471 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
472 .with_circuit_breaker(CircuitBreakerConfig::new())
473 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
474
475 let eh = def
476 .error_handler_config()
477 .expect("error handler should be set");
478 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
479 assert!(def.circuit_breaker_config().is_some());
480 assert!(matches!(
481 def.concurrency_override(),
482 Some(ConcurrencyModel::Concurrent { max: Some(4) })
483 ));
484 }
485
486 #[test]
487 fn test_builder_step_debug_covers_many_variants() {
488 use camel_api::splitter::{split_body_lines, AggregationStrategy, SplitterConfig};
489 use camel_api::{
490 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
491 };
492 use std::sync::Arc;
493
494 let expr = LanguageExpressionDef {
495 language: "simple".into(),
496 source: "${body}".into(),
497 };
498
499 let steps = vec![
500 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
501 BuilderStep::To("mock:out".into()),
502 BuilderStep::Stop,
503 BuilderStep::Log {
504 level: camel_processor::LogLevel::Info,
505 message: "hello".into(),
506 },
507 BuilderStep::DeclarativeSetHeader {
508 key: "k".into(),
509 value: ValueSourceDef::Literal(Value::String("v".into())),
510 },
511 BuilderStep::DeclarativeSetBody {
512 value: ValueSourceDef::Expression(expr.clone()),
513 },
514 BuilderStep::DeclarativeFilter {
515 predicate: expr.clone(),
516 steps: vec![BuilderStep::Stop],
517 },
518 BuilderStep::DeclarativeChoice {
519 whens: vec![DeclarativeWhenStep {
520 predicate: expr.clone(),
521 steps: vec![BuilderStep::Stop],
522 }],
523 otherwise: Some(vec![BuilderStep::Stop]),
524 },
525 BuilderStep::DeclarativeScript {
526 expression: expr.clone(),
527 },
528 BuilderStep::DeclarativeSplit {
529 expression: expr.clone(),
530 aggregation: AggregationStrategy::Original,
531 parallel: false,
532 parallel_limit: Some(2),
533 stop_on_exception: true,
534 steps: vec![BuilderStep::Stop],
535 },
536 BuilderStep::Split {
537 config: SplitterConfig::new(split_body_lines()),
538 steps: vec![BuilderStep::Stop],
539 },
540 BuilderStep::Aggregate {
541 config: camel_api::AggregatorConfig::correlate_by("id")
542 .complete_when_size(1)
543 .build(),
544 },
545 BuilderStep::Filter {
546 predicate: Arc::new(|_: &Exchange| true),
547 steps: vec![BuilderStep::Stop],
548 },
549 BuilderStep::WireTap {
550 uri: "mock:tap".into(),
551 },
552 BuilderStep::DeclarativeLog {
553 level: camel_processor::LogLevel::Info,
554 message: ValueSourceDef::Expression(expr.clone()),
555 },
556 BuilderStep::Bean {
557 name: "bean".into(),
558 method: "call".into(),
559 },
560 BuilderStep::Script {
561 language: "rhai".into(),
562 script: "body".into(),
563 },
564 BuilderStep::Throttle {
565 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
566 steps: vec![BuilderStep::Stop],
567 },
568 BuilderStep::LoadBalance {
569 config: camel_api::LoadBalancerConfig::round_robin(),
570 steps: vec![BuilderStep::To("mock:l1".into())],
571 },
572 BuilderStep::DynamicRouter {
573 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
574 },
575 BuilderStep::RoutingSlip {
576 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
577 },
578 ];
579
580 for step in steps {
581 let dbg = format!("{step:?}");
582 assert!(!dbg.is_empty());
583 }
584 }
585
586 #[test]
587 fn test_route_definition_to_info_preserves_metadata() {
588 let info = RouteDefinition::new("direct:test", vec![])
589 .with_route_id("meta-route")
590 .with_auto_startup(false)
591 .with_startup_order(7)
592 .to_info();
593
594 assert_eq!(info.route_id(), "meta-route");
595 assert!(!info.auto_startup());
596 assert_eq!(info.startup_order(), 7);
597 }
598
599 #[test]
600 fn test_choice_builder_step_debug() {
601 use camel_api::{Exchange, FilterPredicate};
602 use std::sync::Arc;
603
604 fn always_true(_: &Exchange) -> bool {
605 true
606 }
607
608 let step = BuilderStep::Choice {
609 whens: vec![WhenStep {
610 predicate: Arc::new(always_true) as FilterPredicate,
611 steps: vec![BuilderStep::To("mock:a".into())],
612 }],
613 otherwise: None,
614 };
615 let debug = format!("{step:?}");
616 assert!(debug.contains("Choice"));
617 }
618
619 #[test]
620 fn test_route_definition_unit_of_work() {
621 use camel_api::UnitOfWorkConfig;
622 let config = UnitOfWorkConfig {
623 on_complete: Some("log:complete".into()),
624 on_failure: Some("log:failed".into()),
625 };
626 let def = RouteDefinition::new("direct:test", vec![])
627 .with_route_id("uow-test")
628 .with_unit_of_work(config.clone());
629 assert_eq!(
630 def.unit_of_work_config().unwrap().on_complete.as_deref(),
631 Some("log:complete")
632 );
633 assert_eq!(
634 def.unit_of_work_config().unwrap().on_failure.as_deref(),
635 Some("log:failed")
636 );
637
638 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
639 assert!(def_no_uow.unit_of_work_config().is_none());
640 }
641}