1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component::ConcurrencyModel;
9
10pub struct WhenStep {
12 pub predicate: FilterPredicate,
13 pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21 pub predicate: LanguageExpressionDef,
22 pub steps: Vec<BuilderStep>,
23}
24
25pub enum BuilderStep {
27 Processor(BoxProcessor),
29 To(String),
31 Stop,
33 Log {
35 level: camel_processor::LogLevel,
36 message: String,
37 },
38 DeclarativeSetHeader { key: String, value: ValueSourceDef },
40 DeclarativeSetBody { value: ValueSourceDef },
42 DeclarativeFilter {
44 predicate: LanguageExpressionDef,
45 steps: Vec<BuilderStep>,
46 },
47 DeclarativeChoice {
49 whens: Vec<DeclarativeWhenStep>,
50 otherwise: Option<Vec<BuilderStep>>,
51 },
52 DeclarativeScript { expression: LanguageExpressionDef },
54 DeclarativeSplit {
56 expression: LanguageExpressionDef,
57 aggregation: camel_api::splitter::AggregationStrategy,
58 parallel: bool,
59 parallel_limit: Option<usize>,
60 stop_on_exception: bool,
61 steps: Vec<BuilderStep>,
62 },
63 Split {
65 config: SplitterConfig,
66 steps: Vec<BuilderStep>,
67 },
68 Aggregate { config: AggregatorConfig },
70 Filter {
72 predicate: FilterPredicate,
73 steps: Vec<BuilderStep>,
74 },
75 Choice {
78 whens: Vec<WhenStep>,
79 otherwise: Option<Vec<BuilderStep>>,
80 },
81 WireTap { uri: String },
83 Multicast {
85 steps: Vec<BuilderStep>,
86 config: MulticastConfig,
87 },
88 DeclarativeLog {
90 level: camel_processor::LogLevel,
91 message: ValueSourceDef,
92 },
93 Bean { name: String, method: String },
95 Script { language: String, script: String },
98 Throttle {
100 config: camel_api::ThrottlerConfig,
101 steps: Vec<BuilderStep>,
102 },
103 LoadBalance {
105 config: camel_api::LoadBalancerConfig,
106 steps: Vec<BuilderStep>,
107 },
108 DynamicRouter {
110 config: camel_api::DynamicRouterConfig,
111 },
112 RoutingSlip {
113 config: camel_api::RoutingSlipConfig,
114 },
115}
116
117impl std::fmt::Debug for BuilderStep {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 match self {
120 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
121 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
122 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
123 BuilderStep::Log { level, message } => write!(
124 f,
125 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
126 ),
127 BuilderStep::DeclarativeSetHeader { key, .. } => {
128 write!(
129 f,
130 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
131 )
132 }
133 BuilderStep::DeclarativeSetBody { .. } => {
134 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
135 }
136 BuilderStep::DeclarativeFilter { steps, .. } => {
137 write!(
138 f,
139 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
140 )
141 }
142 BuilderStep::DeclarativeChoice { whens, otherwise } => {
143 write!(
144 f,
145 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
146 whens.len(),
147 if otherwise.is_some() { "Some" } else { "None" }
148 )
149 }
150 BuilderStep::DeclarativeScript { expression } => write!(
151 f,
152 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
153 expression.language
154 ),
155 BuilderStep::DeclarativeSplit { steps, .. } => {
156 write!(
157 f,
158 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
159 )
160 }
161 BuilderStep::Split { steps, .. } => {
162 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
163 }
164 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
165 BuilderStep::Filter { steps, .. } => {
166 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
167 }
168 BuilderStep::Choice { whens, otherwise } => {
169 write!(
170 f,
171 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
172 whens.len(),
173 if otherwise.is_some() { "Some" } else { "None" }
174 )
175 }
176 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
177 BuilderStep::Multicast { steps, .. } => {
178 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
179 }
180 BuilderStep::DeclarativeLog { level, .. } => {
181 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
182 }
183 BuilderStep::Bean { name, method } => {
184 write!(
185 f,
186 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
187 )
188 }
189 BuilderStep::Script { language, .. } => {
190 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
191 }
192 BuilderStep::Throttle { steps, .. } => {
193 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
194 }
195 BuilderStep::LoadBalance { steps, .. } => {
196 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
197 }
198 BuilderStep::DynamicRouter { .. } => {
199 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
200 }
201 BuilderStep::RoutingSlip { .. } => {
202 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
203 }
204 }
205 }
206}
207
208pub struct RouteDefinition {
210 pub(crate) from_uri: String,
211 pub(crate) steps: Vec<BuilderStep>,
212 pub(crate) error_handler: Option<ErrorHandlerConfig>,
214 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
216 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
218 pub(crate) concurrency: Option<ConcurrencyModel>,
221 pub(crate) route_id: String,
223 pub(crate) auto_startup: bool,
225 pub(crate) startup_order: i32,
227}
228
229impl RouteDefinition {
230 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
232 Self {
233 from_uri: from_uri.into(),
234 steps,
235 error_handler: None,
236 circuit_breaker: None,
237 unit_of_work: None,
238 concurrency: None,
239 route_id: String::new(), auto_startup: true,
241 startup_order: 1000,
242 }
243 }
244
245 pub fn from_uri(&self) -> &str {
247 &self.from_uri
248 }
249
250 pub fn steps(&self) -> &[BuilderStep] {
252 &self.steps
253 }
254
255 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
257 self.error_handler = Some(config);
258 self
259 }
260
261 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
263 self.error_handler.as_ref()
264 }
265
266 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
268 self.circuit_breaker = Some(config);
269 self
270 }
271
272 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
274 self.unit_of_work = Some(config);
275 self
276 }
277
278 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
280 self.unit_of_work.as_ref()
281 }
282
283 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
285 self.circuit_breaker.as_ref()
286 }
287
288 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
290 self.concurrency.as_ref()
291 }
292
293 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
295 self.concurrency = Some(model);
296 self
297 }
298
299 pub fn route_id(&self) -> &str {
301 &self.route_id
302 }
303
304 pub fn auto_startup(&self) -> bool {
306 self.auto_startup
307 }
308
309 pub fn startup_order(&self) -> i32 {
311 self.startup_order
312 }
313
314 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
316 self.route_id = id.into();
317 self
318 }
319
320 pub fn with_auto_startup(mut self, auto: bool) -> Self {
322 self.auto_startup = auto;
323 self
324 }
325
326 pub fn with_startup_order(mut self, order: i32) -> Self {
328 self.startup_order = order;
329 self
330 }
331
332 pub fn to_info(&self) -> RouteDefinitionInfo {
335 RouteDefinitionInfo {
336 route_id: self.route_id.clone(),
337 auto_startup: self.auto_startup,
338 startup_order: self.startup_order,
339 }
340 }
341}
342
343#[derive(Clone)]
349pub struct RouteDefinitionInfo {
350 route_id: String,
351 auto_startup: bool,
352 startup_order: i32,
353}
354
355impl RouteDefinitionInfo {
356 pub fn route_id(&self) -> &str {
358 &self.route_id
359 }
360
361 pub fn auto_startup(&self) -> bool {
363 self.auto_startup
364 }
365
366 pub fn startup_order(&self) -> i32 {
368 self.startup_order
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn test_builder_step_multicast_variant() {
378 use camel_api::MulticastConfig;
379
380 let step = BuilderStep::Multicast {
381 steps: vec![BuilderStep::To("direct:a".into())],
382 config: MulticastConfig::new(),
383 };
384
385 assert!(matches!(step, BuilderStep::Multicast { .. }));
386 }
387
388 #[test]
389 fn test_route_definition_defaults() {
390 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
391 assert_eq!(def.route_id(), "test-route");
392 assert!(def.auto_startup());
393 assert_eq!(def.startup_order(), 1000);
394 }
395
396 #[test]
397 fn test_route_definition_builders() {
398 let def = RouteDefinition::new("direct:test", vec![])
399 .with_route_id("my-route")
400 .with_auto_startup(false)
401 .with_startup_order(50);
402 assert_eq!(def.route_id(), "my-route");
403 assert!(!def.auto_startup());
404 assert_eq!(def.startup_order(), 50);
405 }
406
407 #[test]
408 fn test_route_definition_accessors_cover_core_fields() {
409 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
410 .with_route_id("accessor-route");
411
412 assert_eq!(def.from_uri(), "direct:in");
413 assert_eq!(def.steps().len(), 1);
414 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
415 }
416
417 #[test]
418 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
419 use camel_api::circuit_breaker::CircuitBreakerConfig;
420 use camel_api::error_handler::ErrorHandlerConfig;
421 use camel_component::ConcurrencyModel;
422
423 let def = RouteDefinition::new("direct:test", vec![])
424 .with_route_id("eh-route")
425 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
426 .with_circuit_breaker(CircuitBreakerConfig::new())
427 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
428
429 let eh = def
430 .error_handler_config()
431 .expect("error handler should be set");
432 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
433 assert!(def.circuit_breaker_config().is_some());
434 assert!(matches!(
435 def.concurrency_override(),
436 Some(ConcurrencyModel::Concurrent { max: Some(4) })
437 ));
438 }
439
440 #[test]
441 fn test_builder_step_debug_covers_many_variants() {
442 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
443 use camel_api::{
444 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
445 };
446 use std::sync::Arc;
447
448 let expr = LanguageExpressionDef {
449 language: "simple".into(),
450 source: "${body}".into(),
451 };
452
453 let steps = vec![
454 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
455 BuilderStep::To("mock:out".into()),
456 BuilderStep::Stop,
457 BuilderStep::Log {
458 level: camel_processor::LogLevel::Info,
459 message: "hello".into(),
460 },
461 BuilderStep::DeclarativeSetHeader {
462 key: "k".into(),
463 value: ValueSourceDef::Literal(Value::String("v".into())),
464 },
465 BuilderStep::DeclarativeSetBody {
466 value: ValueSourceDef::Expression(expr.clone()),
467 },
468 BuilderStep::DeclarativeFilter {
469 predicate: expr.clone(),
470 steps: vec![BuilderStep::Stop],
471 },
472 BuilderStep::DeclarativeChoice {
473 whens: vec![DeclarativeWhenStep {
474 predicate: expr.clone(),
475 steps: vec![BuilderStep::Stop],
476 }],
477 otherwise: Some(vec![BuilderStep::Stop]),
478 },
479 BuilderStep::DeclarativeScript {
480 expression: expr.clone(),
481 },
482 BuilderStep::DeclarativeSplit {
483 expression: expr.clone(),
484 aggregation: AggregationStrategy::Original,
485 parallel: false,
486 parallel_limit: Some(2),
487 stop_on_exception: true,
488 steps: vec![BuilderStep::Stop],
489 },
490 BuilderStep::Split {
491 config: SplitterConfig::new(split_body_lines()),
492 steps: vec![BuilderStep::Stop],
493 },
494 BuilderStep::Aggregate {
495 config: camel_api::AggregatorConfig::correlate_by("id")
496 .complete_when_size(1)
497 .build(),
498 },
499 BuilderStep::Filter {
500 predicate: Arc::new(|_: &Exchange| true),
501 steps: vec![BuilderStep::Stop],
502 },
503 BuilderStep::WireTap {
504 uri: "mock:tap".into(),
505 },
506 BuilderStep::DeclarativeLog {
507 level: camel_processor::LogLevel::Info,
508 message: ValueSourceDef::Expression(expr.clone()),
509 },
510 BuilderStep::Bean {
511 name: "bean".into(),
512 method: "call".into(),
513 },
514 BuilderStep::Script {
515 language: "rhai".into(),
516 script: "body".into(),
517 },
518 BuilderStep::Throttle {
519 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
520 steps: vec![BuilderStep::Stop],
521 },
522 BuilderStep::LoadBalance {
523 config: camel_api::LoadBalancerConfig::round_robin(),
524 steps: vec![BuilderStep::To("mock:l1".into())],
525 },
526 BuilderStep::DynamicRouter {
527 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
528 },
529 BuilderStep::RoutingSlip {
530 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
531 },
532 ];
533
534 for step in steps {
535 let dbg = format!("{step:?}");
536 assert!(!dbg.is_empty());
537 }
538 }
539
540 #[test]
541 fn test_route_definition_to_info_preserves_metadata() {
542 let info = RouteDefinition::new("direct:test", vec![])
543 .with_route_id("meta-route")
544 .with_auto_startup(false)
545 .with_startup_order(7)
546 .to_info();
547
548 assert_eq!(info.route_id(), "meta-route");
549 assert!(!info.auto_startup());
550 assert_eq!(info.startup_order(), 7);
551 }
552
553 #[test]
554 fn test_choice_builder_step_debug() {
555 use camel_api::{Exchange, FilterPredicate};
556 use std::sync::Arc;
557
558 fn always_true(_: &Exchange) -> bool {
559 true
560 }
561
562 let step = BuilderStep::Choice {
563 whens: vec![WhenStep {
564 predicate: Arc::new(always_true) as FilterPredicate,
565 steps: vec![BuilderStep::To("mock:a".into())],
566 }],
567 otherwise: None,
568 };
569 let debug = format!("{step:?}");
570 assert!(debug.contains("Choice"));
571 }
572
573 #[test]
574 fn test_route_definition_unit_of_work() {
575 use camel_api::UnitOfWorkConfig;
576 let config = UnitOfWorkConfig {
577 on_complete: Some("log:complete".into()),
578 on_failure: Some("log:failed".into()),
579 };
580 let def = RouteDefinition::new("direct:test", vec![])
581 .with_route_id("uow-test")
582 .with_unit_of_work(config.clone());
583 assert_eq!(
584 def.unit_of_work_config().unwrap().on_complete.as_deref(),
585 Some("log:complete")
586 );
587 assert_eq!(
588 def.unit_of_work_config().unwrap().on_failure.as_deref(),
589 Some("log:failed")
590 );
591
592 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
593 assert!(def_no_uow.unit_of_work_config().is_none());
594 }
595}