Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::error_handler::ErrorHandlerConfig;
6use camel_api::multicast::{MulticastConfig, MulticastStrategy};
7use camel_api::splitter::SplitterConfig;
8use camel_api::{
9    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
10};
11use camel_component::ConcurrencyModel;
12use camel_core::route::{BuilderStep, RouteDefinition, WhenStep};
13use camel_processor::{
14    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService,
15};
16
17/// Shared step-accumulation methods for all builder types.
18///
19/// Implementors provide `steps_mut()` and get step-adding methods for free.
20/// `filter()` and other branching methods are NOT included — they return
21/// different types per builder and stay as per-builder methods.
22pub trait StepAccumulator: Sized {
23    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
24
25    fn to(mut self, endpoint: impl Into<String>) -> Self {
26        self.steps_mut().push(BuilderStep::To(endpoint.into()));
27        self
28    }
29
30    fn process<F, Fut>(mut self, f: F) -> Self
31    where
32        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
33        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
34    {
35        let svc = ProcessorFn::new(f);
36        self.steps_mut()
37            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
38        self
39    }
40
41    fn process_fn(mut self, processor: BoxProcessor) -> Self {
42        self.steps_mut().push(BuilderStep::Processor(processor));
43        self
44    }
45
46    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
47        let svc = SetHeader::new(IdentityProcessor, key, value);
48        self.steps_mut()
49            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
50        self
51    }
52
53    fn map_body<F>(mut self, mapper: F) -> Self
54    where
55        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
56    {
57        let svc = MapBody::new(IdentityProcessor, mapper);
58        self.steps_mut()
59            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
60        self
61    }
62
63    fn set_body<B>(mut self, body: B) -> Self
64    where
65        B: Into<Body> + Clone + Send + Sync + 'static,
66    {
67        let body: Body = body.into();
68        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn set_body_fn<F>(mut self, expr: F) -> Self
75    where
76        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = SetBody::new(IdentityProcessor, expr);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
85    where
86        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
87    {
88        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
89        self.steps_mut()
90            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
91        self
92    }
93
94    fn aggregate(mut self, config: AggregatorConfig) -> Self {
95        self.steps_mut().push(BuilderStep::Aggregate { config });
96        self
97    }
98
99    /// Stop processing this exchange immediately. No further steps in the
100    /// current pipeline will run.
101    ///
102    /// Can be used at any point in the route: directly on RouteBuilder,
103    /// inside `.filter()`, inside `.split()`, etc.
104    fn stop(mut self) -> Self {
105        self.steps_mut()
106            .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
107        self
108    }
109
110    /// Log a message at the specified level.
111    ///
112    /// The message will be logged when an exchange passes through this step.
113    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
114        use camel_processor::LogProcessor;
115        let svc = LogProcessor::new(level, message.into());
116        self.steps_mut()
117            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118        self
119    }
120
121    /// Convert the message body to the target type.
122    ///
123    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
124    /// Returns `TypeConversionFailed` if conversion is not possible.
125    ///
126    /// # Example
127    /// ```ignore
128    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
129    ///      .convert_body_to(BodyType::Json)
130    ///      .to("direct:next")
131    /// ```
132    fn convert_body_to(mut self, target: BodyType) -> Self {
133        let svc = ConvertBodyTo::new(IdentityProcessor, target);
134        self.steps_mut()
135            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
136        self
137    }
138}
139
140/// A fluent builder for constructing routes.
141///
142/// # Example
143///
144/// ```ignore
145/// let definition = RouteBuilder::from("timer:tick?period=1000")
146///     .set_header("source", Value::String("timer".into()))
147///     .filter(|ex| ex.input.body.as_text().is_some())
148///     .to("log:info?showHeaders=true")
149///     .build()?;
150/// ```
151pub struct RouteBuilder {
152    from_uri: String,
153    steps: Vec<BuilderStep>,
154    error_handler: Option<ErrorHandlerConfig>,
155    circuit_breaker_config: Option<CircuitBreakerConfig>,
156    concurrency: Option<ConcurrencyModel>,
157    route_id: Option<String>,
158    auto_startup: Option<bool>,
159    startup_order: Option<i32>,
160}
161
162impl RouteBuilder {
163    /// Start building a route from the given source endpoint URI.
164    pub fn from(endpoint: &str) -> Self {
165        Self {
166            from_uri: endpoint.to_string(),
167            steps: Vec::new(),
168            error_handler: None,
169            circuit_breaker_config: None,
170            concurrency: None,
171            route_id: None,
172            auto_startup: None,
173            startup_order: None,
174        }
175    }
176
177    /// Open a filter scope. Only exchanges matching `predicate` will be processed
178    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
179    /// and continue to steps after `.end_filter()`.
180    pub fn filter<F>(self, predicate: F) -> FilterBuilder
181    where
182        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
183    {
184        FilterBuilder {
185            parent: self,
186            predicate: std::sync::Arc::new(predicate),
187            steps: vec![],
188        }
189    }
190
191    /// Open a choice scope for content-based routing.
192    ///
193    /// Within the choice, you can define multiple `.when()` clauses and an
194    /// optional `.otherwise()` clause. The first matching `when` predicate
195    /// determines which sub-pipeline executes.
196    pub fn choice(self) -> ChoiceBuilder {
197        ChoiceBuilder {
198            parent: self,
199            whens: vec![],
200            _otherwise: None,
201        }
202    }
203
204    /// Add a WireTap step that sends a clone of the exchange to the given
205    /// endpoint URI (fire-and-forget). The original exchange continues
206    /// downstream unchanged.
207    pub fn wire_tap(mut self, endpoint: &str) -> Self {
208        self.steps.push(BuilderStep::WireTap {
209            uri: endpoint.to_string(),
210        });
211        self
212    }
213
214    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
215    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
216        self.error_handler = Some(config);
217        self
218    }
219
220    /// Set a circuit breaker for this route.
221    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
222        self.circuit_breaker_config = Some(config);
223        self
224    }
225
226    /// Override the consumer's default concurrency model.
227    ///
228    /// When set, the pipeline spawns a task per exchange, processing them
229    /// concurrently. `max` limits the number of simultaneously active
230    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
231    ///
232    /// # Example
233    /// ```ignore
234    /// RouteBuilder::from("http://0.0.0.0:8080/api")
235    ///     .concurrent(16)  // max 16 in-flight pipeline executions
236    ///     .process(handle_request)
237    ///     .build()
238    /// ```
239    pub fn concurrent(mut self, max: usize) -> Self {
240        let max = if max == 0 { None } else { Some(max) };
241        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
242        self
243    }
244
245    /// Force sequential processing, overriding a concurrent-capable consumer.
246    ///
247    /// Useful for HTTP routes that mutate shared state and need ordering
248    /// guarantees.
249    pub fn sequential(mut self) -> Self {
250        self.concurrency = Some(ConcurrencyModel::Sequential);
251        self
252    }
253
254    /// Set the route ID for this route.
255    ///
256    /// If not set, the route will be assigned an auto-generated ID.
257    pub fn route_id(mut self, id: impl Into<String>) -> Self {
258        self.route_id = Some(id.into());
259        self
260    }
261
262    /// Set whether this route should automatically start when the context starts.
263    ///
264    /// Default is `true`.
265    pub fn auto_startup(mut self, auto: bool) -> Self {
266        self.auto_startup = Some(auto);
267        self
268    }
269
270    /// Set the startup order for this route.
271    ///
272    /// Routes with lower values start first. Default is 1000.
273    pub fn startup_order(mut self, order: i32) -> Self {
274        self.startup_order = Some(order);
275        self
276    }
277
278    /// Begin a Splitter sub-pipeline. Steps added after this call (until
279    /// `.end_split()`) will be executed per-fragment.
280    ///
281    /// Returns a `SplitBuilder` — you cannot call `.build()` until
282    /// `.end_split()` closes the split scope (enforced by the type system).
283    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
284        SplitBuilder {
285            parent: self,
286            config,
287            steps: Vec::new(),
288        }
289    }
290
291    /// Begin a Multicast sub-pipeline. Steps added after this call (until
292    /// `.end_multicast()`) will each receive a copy of the exchange.
293    ///
294    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
295    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
296    pub fn multicast(self) -> MulticastBuilder {
297        MulticastBuilder {
298            parent: self,
299            steps: Vec::new(),
300            config: MulticastConfig::new(),
301        }
302    }
303
304    /// Consume the builder and produce a [`RouteDefinition`].
305    pub fn build(self) -> Result<RouteDefinition, CamelError> {
306        if self.from_uri.is_empty() {
307            return Err(CamelError::RouteError(
308                "route must have a 'from' URI".to_string(),
309            ));
310        }
311        let route_id = self.route_id.ok_or_else(|| {
312            CamelError::RouteError(
313                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
314                    .to_string(),
315            )
316        })?;
317        let definition = RouteDefinition::new(self.from_uri, self.steps);
318        let definition = if let Some(eh) = self.error_handler {
319            definition.with_error_handler(eh)
320        } else {
321            definition
322        };
323        let definition = if let Some(cb) = self.circuit_breaker_config {
324            definition.with_circuit_breaker(cb)
325        } else {
326            definition
327        };
328        let definition = if let Some(concurrency) = self.concurrency {
329            definition.with_concurrency(concurrency)
330        } else {
331            definition
332        };
333        let definition = definition.with_route_id(route_id);
334        let definition = if let Some(auto) = self.auto_startup {
335            definition.with_auto_startup(auto)
336        } else {
337            definition
338        };
339        let definition = if let Some(order) = self.startup_order {
340            definition.with_startup_order(order)
341        } else {
342            definition
343        };
344        Ok(definition)
345    }
346}
347
348impl StepAccumulator for RouteBuilder {
349    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
350        &mut self.steps
351    }
352}
353
354/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
355///
356/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
357/// but NOT `.build()` and NOT `.split()` (no nested splits).
358///
359/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
360/// and returns the parent `RouteBuilder`.
361pub struct SplitBuilder {
362    parent: RouteBuilder,
363    config: SplitterConfig,
364    steps: Vec<BuilderStep>,
365}
366
367impl SplitBuilder {
368    /// Open a filter scope within the split sub-pipeline.
369    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
370    where
371        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
372    {
373        FilterInSplitBuilder {
374            parent: self,
375            predicate: std::sync::Arc::new(predicate),
376            steps: vec![],
377        }
378    }
379
380    /// Close the split scope. Packages the accumulated sub-steps into a
381    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
382    pub fn end_split(mut self) -> RouteBuilder {
383        let split_step = BuilderStep::Split {
384            config: self.config,
385            steps: self.steps,
386        };
387        self.parent.steps.push(split_step);
388        self.parent
389    }
390}
391
392impl StepAccumulator for SplitBuilder {
393    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
394        &mut self.steps
395    }
396}
397
398/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
399pub struct FilterBuilder {
400    parent: RouteBuilder,
401    predicate: FilterPredicate,
402    steps: Vec<BuilderStep>,
403}
404
405impl FilterBuilder {
406    /// Close the filter scope. Packages the accumulated sub-steps into a
407    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
408    pub fn end_filter(mut self) -> RouteBuilder {
409        let step = BuilderStep::Filter {
410            predicate: self.predicate,
411            steps: self.steps,
412        };
413        self.parent.steps.push(step);
414        self.parent
415    }
416}
417
418impl StepAccumulator for FilterBuilder {
419    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
420        &mut self.steps
421    }
422}
423
424/// Builder for a filter scope nested inside a `.split()` block.
425pub struct FilterInSplitBuilder {
426    parent: SplitBuilder,
427    predicate: FilterPredicate,
428    steps: Vec<BuilderStep>,
429}
430
431impl FilterInSplitBuilder {
432    /// Close the filter scope and return the parent `SplitBuilder`.
433    pub fn end_filter(mut self) -> SplitBuilder {
434        let step = BuilderStep::Filter {
435            predicate: self.predicate,
436            steps: self.steps,
437        };
438        self.parent.steps.push(step);
439        self.parent
440    }
441}
442
443impl StepAccumulator for FilterInSplitBuilder {
444    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
445        &mut self.steps
446    }
447}
448
449// ── Choice/When/Otherwise builders ─────────────────────────────────────────
450
451/// Builder for a `.choice()` ... `.end_choice()` block.
452///
453/// Accumulates `when` clauses and an optional `otherwise` clause.
454/// Cannot call `.build()` until `.end_choice()` is called.
455pub struct ChoiceBuilder {
456    parent: RouteBuilder,
457    whens: Vec<WhenStep>,
458    _otherwise: Option<Vec<BuilderStep>>,
459}
460
461impl ChoiceBuilder {
462    /// Open a `when` clause. Only exchanges matching `predicate` will be
463    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
464    pub fn when<F>(self, predicate: F) -> WhenBuilder
465    where
466        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
467    {
468        WhenBuilder {
469            parent: self,
470            predicate: std::sync::Arc::new(predicate),
471            steps: vec![],
472        }
473    }
474
475    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
476    ///
477    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
478    pub fn otherwise(self) -> OtherwiseBuilder {
479        OtherwiseBuilder {
480            parent: self,
481            steps: vec![],
482        }
483    }
484
485    /// Close the choice scope. Packages all accumulated `when` clauses and
486    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
487    /// parent `RouteBuilder`.
488    pub fn end_choice(mut self) -> RouteBuilder {
489        let step = BuilderStep::Choice {
490            whens: self.whens,
491            otherwise: self._otherwise,
492        };
493        self.parent.steps.push(step);
494        self.parent
495    }
496}
497
498/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
499pub struct WhenBuilder {
500    parent: ChoiceBuilder,
501    predicate: camel_api::FilterPredicate,
502    steps: Vec<BuilderStep>,
503}
504
505impl WhenBuilder {
506    /// Close the when scope. Packages the accumulated sub-steps into a
507    /// `WhenStep` and returns the parent `ChoiceBuilder`.
508    pub fn end_when(mut self) -> ChoiceBuilder {
509        self.parent.whens.push(WhenStep {
510            predicate: self.predicate,
511            steps: self.steps,
512        });
513        self.parent
514    }
515}
516
517impl StepAccumulator for WhenBuilder {
518    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
519        &mut self.steps
520    }
521}
522
523/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
524pub struct OtherwiseBuilder {
525    parent: ChoiceBuilder,
526    steps: Vec<BuilderStep>,
527}
528
529impl OtherwiseBuilder {
530    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
531    pub fn end_otherwise(self) -> ChoiceBuilder {
532        let OtherwiseBuilder { mut parent, steps } = self;
533        parent._otherwise = Some(steps);
534        parent
535    }
536}
537
538impl StepAccumulator for OtherwiseBuilder {
539    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
540        &mut self.steps
541    }
542}
543
544/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
545///
546/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
547/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
548///
549/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
550/// and returns the parent `RouteBuilder`.
551pub struct MulticastBuilder {
552    parent: RouteBuilder,
553    steps: Vec<BuilderStep>,
554    config: MulticastConfig,
555}
556
557impl MulticastBuilder {
558    pub fn parallel(mut self, parallel: bool) -> Self {
559        self.config = self.config.parallel(parallel);
560        self
561    }
562
563    pub fn parallel_limit(mut self, limit: usize) -> Self {
564        self.config = self.config.parallel_limit(limit);
565        self
566    }
567
568    pub fn stop_on_exception(mut self, stop: bool) -> Self {
569        self.config = self.config.stop_on_exception(stop);
570        self
571    }
572
573    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
574        self.config = self.config.timeout(duration);
575        self
576    }
577
578    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
579        self.config = self.config.aggregation(strategy);
580        self
581    }
582
583    pub fn end_multicast(mut self) -> RouteBuilder {
584        let step = BuilderStep::Multicast {
585            steps: self.steps,
586            config: self.config,
587        };
588        self.parent.steps.push(step);
589        self.parent
590    }
591}
592
593impl StepAccumulator for MulticastBuilder {
594    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
595        &mut self.steps
596    }
597}
598
599// ---------------------------------------------------------------------------
600// Tests
601// ---------------------------------------------------------------------------
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use camel_api::{Exchange, Message};
607    use camel_core::route::BuilderStep;
608    use tower::{Service, ServiceExt};
609
610    #[test]
611    fn test_builder_from_creates_definition() {
612        let definition = RouteBuilder::from("timer:tick")
613            .route_id("test-route")
614            .build()
615            .unwrap();
616        assert_eq!(definition.from_uri(), "timer:tick");
617    }
618
619    #[test]
620    fn test_builder_empty_from_uri_errors() {
621        let result = RouteBuilder::from("").route_id("test-route").build();
622        assert!(result.is_err());
623    }
624
625    #[test]
626    fn test_builder_to_adds_step() {
627        let definition = RouteBuilder::from("timer:tick")
628            .route_id("test-route")
629            .to("log:info")
630            .build()
631            .unwrap();
632
633        assert_eq!(definition.from_uri(), "timer:tick");
634        // We can verify steps were added by checking the structure
635        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
636    }
637
638    #[test]
639    fn test_builder_filter_adds_filter_step() {
640        let definition = RouteBuilder::from("timer:tick")
641            .route_id("test-route")
642            .filter(|_ex| true)
643            .to("mock:result")
644            .end_filter()
645            .build()
646            .unwrap();
647
648        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
649    }
650
651    #[test]
652    fn test_builder_set_header_adds_processor_step() {
653        let definition = RouteBuilder::from("timer:tick")
654            .route_id("test-route")
655            .set_header("key", Value::String("value".into()))
656            .build()
657            .unwrap();
658
659        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
660    }
661
662    #[test]
663    fn test_builder_map_body_adds_processor_step() {
664        let definition = RouteBuilder::from("timer:tick")
665            .route_id("test-route")
666            .map_body(|body| body)
667            .build()
668            .unwrap();
669
670        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
671    }
672
673    #[test]
674    fn test_builder_process_adds_processor_step() {
675        let definition = RouteBuilder::from("timer:tick")
676            .route_id("test-route")
677            .process(|ex| async move { Ok(ex) })
678            .build()
679            .unwrap();
680
681        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
682    }
683
684    #[test]
685    fn test_builder_chain_multiple_steps() {
686        let definition = RouteBuilder::from("timer:tick")
687            .route_id("test-route")
688            .set_header("source", Value::String("timer".into()))
689            .filter(|ex| ex.input.header("source").is_some())
690            .to("log:info")
691            .end_filter()
692            .to("mock:result")
693            .build()
694            .unwrap();
695
696        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
697        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
698        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
699        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
700    }
701
702    // -----------------------------------------------------------------------
703    // Processor behavior tests — exercise the real Tower services directly
704    // -----------------------------------------------------------------------
705
706    #[tokio::test]
707    async fn test_set_header_processor_works() {
708        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
709        let exchange = Exchange::new(Message::new("test"));
710        let result = svc.call(exchange).await.unwrap();
711        assert_eq!(
712            result.input.header("greeting"),
713            Some(&Value::String("hello".into()))
714        );
715    }
716
717    #[tokio::test]
718    async fn test_filter_processor_passes() {
719        use camel_api::BoxProcessorExt;
720        use camel_processor::FilterService;
721
722        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
723        let mut svc =
724            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
725        let exchange = Exchange::new(Message::new("pass"));
726        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
727        assert_eq!(result.input.body.as_text(), Some("pass"));
728    }
729
730    #[tokio::test]
731    async fn test_filter_processor_blocks() {
732        use camel_api::BoxProcessorExt;
733        use camel_processor::FilterService;
734
735        let sub = BoxProcessor::from_fn(|_ex| {
736            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
737        });
738        let mut svc =
739            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
740        let exchange = Exchange::new(Message::new("reject"));
741        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
742        assert_eq!(result.input.body.as_text(), Some("reject"));
743    }
744
745    #[tokio::test]
746    async fn test_map_body_processor_works() {
747        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
748            if let Some(text) = body.as_text() {
749                Body::Text(text.to_uppercase())
750            } else {
751                body
752            }
753        });
754        let exchange = Exchange::new(Message::new("hello"));
755        let result = mapper.oneshot(exchange).await.unwrap();
756        assert_eq!(result.input.body.as_text(), Some("HELLO"));
757    }
758
759    #[tokio::test]
760    async fn test_process_custom_processor_works() {
761        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
762            ex.set_property("custom", Value::Bool(true));
763            Ok(ex)
764        });
765        let exchange = Exchange::new(Message::default());
766        let result = processor.oneshot(exchange).await.unwrap();
767        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
768    }
769
770    // -----------------------------------------------------------------------
771    // Sequential pipeline test
772    // -----------------------------------------------------------------------
773
774    #[tokio::test]
775    async fn test_compose_pipeline_runs_steps_in_order() {
776        use camel_core::route::compose_pipeline;
777
778        let processors = vec![
779            BoxProcessor::new(SetHeader::new(
780                IdentityProcessor,
781                "step",
782                Value::String("one".into()),
783            )),
784            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
785                if let Some(text) = body.as_text() {
786                    Body::Text(format!("{}-processed", text))
787                } else {
788                    body
789                }
790            })),
791        ];
792
793        let pipeline = compose_pipeline(processors);
794        let exchange = Exchange::new(Message::new("hello"));
795        let result = pipeline.oneshot(exchange).await.unwrap();
796
797        assert_eq!(
798            result.input.header("step"),
799            Some(&Value::String("one".into()))
800        );
801        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
802    }
803
804    #[tokio::test]
805    async fn test_compose_pipeline_empty_is_identity() {
806        use camel_core::route::compose_pipeline;
807
808        let pipeline = compose_pipeline(vec![]);
809        let exchange = Exchange::new(Message::new("unchanged"));
810        let result = pipeline.oneshot(exchange).await.unwrap();
811        assert_eq!(result.input.body.as_text(), Some("unchanged"));
812    }
813
814    // -----------------------------------------------------------------------
815    // Circuit breaker builder tests
816    // -----------------------------------------------------------------------
817
818    #[test]
819    fn test_builder_circuit_breaker_sets_config() {
820        use camel_api::circuit_breaker::CircuitBreakerConfig;
821
822        let config = CircuitBreakerConfig::new().failure_threshold(5);
823        let definition = RouteBuilder::from("timer:tick")
824            .route_id("test-route")
825            .circuit_breaker(config)
826            .build()
827            .unwrap();
828
829        let cb = definition
830            .circuit_breaker_config()
831            .expect("circuit breaker should be set");
832        assert_eq!(cb.failure_threshold, 5);
833    }
834
835    #[test]
836    fn test_builder_circuit_breaker_with_error_handler() {
837        use camel_api::circuit_breaker::CircuitBreakerConfig;
838        use camel_api::error_handler::ErrorHandlerConfig;
839
840        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
841        let eh_config = ErrorHandlerConfig::log_only();
842
843        let definition = RouteBuilder::from("timer:tick")
844            .route_id("test-route")
845            .to("log:info")
846            .circuit_breaker(cb_config)
847            .error_handler(eh_config)
848            .build()
849            .unwrap();
850
851        assert!(
852            definition.circuit_breaker_config().is_some(),
853            "circuit breaker config should be set"
854        );
855        // Route definition was built successfully with both configs.
856    }
857
858    // --- Splitter builder tests ---
859
860    #[test]
861    fn test_split_builder_typestate() {
862        use camel_api::splitter::{SplitterConfig, split_body_lines};
863
864        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
865        let definition = RouteBuilder::from("timer:test?period=1000")
866            .route_id("test-route")
867            .split(SplitterConfig::new(split_body_lines()))
868            .to("mock:per-fragment")
869            .end_split()
870            .to("mock:final")
871            .build()
872            .unwrap();
873
874        // Should have 2 top-level steps: Split + To("mock:final")
875        assert_eq!(definition.steps().len(), 2);
876    }
877
878    #[test]
879    fn test_split_builder_steps_collected() {
880        use camel_api::splitter::{SplitterConfig, split_body_lines};
881
882        let definition = RouteBuilder::from("timer:test?period=1000")
883            .route_id("test-route")
884            .split(SplitterConfig::new(split_body_lines()))
885            .set_header("fragment", Value::String("yes".into()))
886            .to("mock:per-fragment")
887            .end_split()
888            .build()
889            .unwrap();
890
891        // Should have 1 top-level step: Split (containing 2 sub-steps)
892        assert_eq!(definition.steps().len(), 1);
893        match &definition.steps()[0] {
894            BuilderStep::Split { steps, .. } => {
895                assert_eq!(steps.len(), 2); // SetHeader + To
896            }
897            other => panic!("Expected Split, got {:?}", other),
898        }
899    }
900
901    #[test]
902    fn test_split_builder_config_propagated() {
903        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
904
905        let definition = RouteBuilder::from("timer:test?period=1000")
906            .route_id("test-route")
907            .split(
908                SplitterConfig::new(split_body_lines())
909                    .parallel(true)
910                    .parallel_limit(4)
911                    .aggregation(AggregationStrategy::CollectAll),
912            )
913            .to("mock:per-fragment")
914            .end_split()
915            .build()
916            .unwrap();
917
918        match &definition.steps()[0] {
919            BuilderStep::Split { config, .. } => {
920                assert!(config.parallel);
921                assert_eq!(config.parallel_limit, Some(4));
922                assert!(matches!(
923                    config.aggregation,
924                    AggregationStrategy::CollectAll
925                ));
926            }
927            other => panic!("Expected Split, got {:?}", other),
928        }
929    }
930
931    #[test]
932    fn test_aggregate_builder_adds_step() {
933        use camel_api::aggregator::AggregatorConfig;
934        use camel_core::route::BuilderStep;
935
936        let definition = RouteBuilder::from("timer:tick")
937            .route_id("test-route")
938            .aggregate(
939                AggregatorConfig::correlate_by("key")
940                    .complete_when_size(2)
941                    .build(),
942            )
943            .build()
944            .unwrap();
945
946        assert_eq!(definition.steps().len(), 1);
947        assert!(matches!(
948            definition.steps()[0],
949            BuilderStep::Aggregate { .. }
950        ));
951    }
952
953    #[test]
954    fn test_aggregate_in_split_builder() {
955        use camel_api::aggregator::AggregatorConfig;
956        use camel_api::splitter::{SplitterConfig, split_body_lines};
957        use camel_core::route::BuilderStep;
958
959        let definition = RouteBuilder::from("timer:tick")
960            .route_id("test-route")
961            .split(SplitterConfig::new(split_body_lines()))
962            .aggregate(
963                AggregatorConfig::correlate_by("key")
964                    .complete_when_size(1)
965                    .build(),
966            )
967            .end_split()
968            .build()
969            .unwrap();
970
971        assert_eq!(definition.steps().len(), 1);
972        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
973            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
974        } else {
975            panic!("expected Split step");
976        }
977    }
978
979    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
980
981    #[test]
982    fn test_builder_set_body_static_adds_processor() {
983        let definition = RouteBuilder::from("timer:tick")
984            .route_id("test-route")
985            .set_body("fixed")
986            .build()
987            .unwrap();
988        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
989    }
990
991    #[test]
992    fn test_builder_set_body_fn_adds_processor() {
993        let definition = RouteBuilder::from("timer:tick")
994            .route_id("test-route")
995            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
996            .build()
997            .unwrap();
998        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
999    }
1000
1001    #[test]
1002    fn test_builder_set_header_fn_adds_processor() {
1003        let definition = RouteBuilder::from("timer:tick")
1004            .route_id("test-route")
1005            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1006            .build()
1007            .unwrap();
1008        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1009    }
1010
1011    #[tokio::test]
1012    async fn test_set_body_static_processor_works() {
1013        use camel_core::route::compose_pipeline;
1014        let def = RouteBuilder::from("t:t")
1015            .route_id("test-route")
1016            .set_body("replaced")
1017            .build()
1018            .unwrap();
1019        let pipeline = compose_pipeline(
1020            def.steps()
1021                .iter()
1022                .filter_map(|s| {
1023                    if let BuilderStep::Processor(p) = s {
1024                        Some(p.clone())
1025                    } else {
1026                        None
1027                    }
1028                })
1029                .collect(),
1030        );
1031        let exchange = Exchange::new(Message::new("original"));
1032        let result = pipeline.oneshot(exchange).await.unwrap();
1033        assert_eq!(result.input.body.as_text(), Some("replaced"));
1034    }
1035
1036    #[tokio::test]
1037    async fn test_set_body_fn_processor_works() {
1038        use camel_core::route::compose_pipeline;
1039        let def = RouteBuilder::from("t:t")
1040            .route_id("test-route")
1041            .set_body_fn(|ex: &Exchange| {
1042                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1043            })
1044            .build()
1045            .unwrap();
1046        let pipeline = compose_pipeline(
1047            def.steps()
1048                .iter()
1049                .filter_map(|s| {
1050                    if let BuilderStep::Processor(p) = s {
1051                        Some(p.clone())
1052                    } else {
1053                        None
1054                    }
1055                })
1056                .collect(),
1057        );
1058        let exchange = Exchange::new(Message::new("hello"));
1059        let result = pipeline.oneshot(exchange).await.unwrap();
1060        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1061    }
1062
1063    #[tokio::test]
1064    async fn test_set_header_fn_processor_works() {
1065        use camel_core::route::compose_pipeline;
1066        let def = RouteBuilder::from("t:t")
1067            .route_id("test-route")
1068            .set_header_fn("echo", |ex: &Exchange| {
1069                ex.input
1070                    .body
1071                    .as_text()
1072                    .map(|t| Value::String(t.into()))
1073                    .unwrap_or(Value::Null)
1074            })
1075            .build()
1076            .unwrap();
1077        let pipeline = compose_pipeline(
1078            def.steps()
1079                .iter()
1080                .filter_map(|s| {
1081                    if let BuilderStep::Processor(p) = s {
1082                        Some(p.clone())
1083                    } else {
1084                        None
1085                    }
1086                })
1087                .collect(),
1088        );
1089        let exchange = Exchange::new(Message::new("ping"));
1090        let result = pipeline.oneshot(exchange).await.unwrap();
1091        assert_eq!(
1092            result.input.header("echo"),
1093            Some(&Value::String("ping".into()))
1094        );
1095    }
1096
1097    // ── FilterBuilder typestate tests ─────────────────────────────────────
1098
1099    #[test]
1100    fn test_filter_builder_typestate() {
1101        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1102            .route_id("test-route")
1103            .filter(|_ex| true)
1104            .to("mock:inner")
1105            .end_filter()
1106            .to("mock:outer")
1107            .build();
1108        assert!(result.is_ok());
1109    }
1110
1111    #[test]
1112    fn test_filter_builder_steps_collected() {
1113        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1114            .route_id("test-route")
1115            .filter(|_ex| true)
1116            .to("mock:inner")
1117            .end_filter()
1118            .build()
1119            .unwrap();
1120
1121        assert_eq!(definition.steps().len(), 1);
1122        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1123    }
1124
1125    #[test]
1126    fn test_wire_tap_builder_adds_step() {
1127        let definition = RouteBuilder::from("timer:tick")
1128            .route_id("test-route")
1129            .wire_tap("mock:tap")
1130            .to("mock:result")
1131            .build()
1132            .unwrap();
1133
1134        assert_eq!(definition.steps().len(), 2);
1135        assert!(
1136            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1137        );
1138        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1139    }
1140
1141    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1142
1143    #[test]
1144    fn test_multicast_builder_typestate() {
1145        let definition = RouteBuilder::from("timer:tick")
1146            .route_id("test-route")
1147            .multicast()
1148            .to("direct:a")
1149            .to("direct:b")
1150            .end_multicast()
1151            .to("mock:result")
1152            .build()
1153            .unwrap();
1154
1155        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1156    }
1157
1158    #[test]
1159    fn test_multicast_builder_steps_collected() {
1160        let definition = RouteBuilder::from("timer:tick")
1161            .route_id("test-route")
1162            .multicast()
1163            .to("direct:a")
1164            .to("direct:b")
1165            .end_multicast()
1166            .build()
1167            .unwrap();
1168
1169        match &definition.steps()[0] {
1170            BuilderStep::Multicast { steps, .. } => {
1171                assert_eq!(steps.len(), 2);
1172            }
1173            other => panic!("Expected Multicast, got {:?}", other),
1174        }
1175    }
1176
1177    // ── Concurrency builder tests ─────────────────────────────────────
1178
1179    #[test]
1180    fn test_builder_concurrent_sets_concurrency() {
1181        use camel_component::ConcurrencyModel;
1182
1183        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1184            .route_id("test-route")
1185            .concurrent(16)
1186            .to("log:info")
1187            .build()
1188            .unwrap();
1189
1190        assert_eq!(
1191            definition.concurrency_override(),
1192            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1193        );
1194    }
1195
1196    #[test]
1197    fn test_builder_concurrent_zero_means_unbounded() {
1198        use camel_component::ConcurrencyModel;
1199
1200        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1201            .route_id("test-route")
1202            .concurrent(0)
1203            .to("log:info")
1204            .build()
1205            .unwrap();
1206
1207        assert_eq!(
1208            definition.concurrency_override(),
1209            Some(&ConcurrencyModel::Concurrent { max: None })
1210        );
1211    }
1212
1213    #[test]
1214    fn test_builder_sequential_sets_concurrency() {
1215        use camel_component::ConcurrencyModel;
1216
1217        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1218            .route_id("test-route")
1219            .sequential()
1220            .to("log:info")
1221            .build()
1222            .unwrap();
1223
1224        assert_eq!(
1225            definition.concurrency_override(),
1226            Some(&ConcurrencyModel::Sequential)
1227        );
1228    }
1229
1230    #[test]
1231    fn test_builder_default_concurrency_is_none() {
1232        let definition = RouteBuilder::from("timer:tick")
1233            .route_id("test-route")
1234            .to("log:info")
1235            .build()
1236            .unwrap();
1237
1238        assert_eq!(definition.concurrency_override(), None);
1239    }
1240
1241    // ── Route lifecycle builder tests ─────────────────────────────────────
1242
1243    #[test]
1244    fn test_builder_route_id_sets_id() {
1245        let definition = RouteBuilder::from("timer:tick")
1246            .route_id("my-route")
1247            .build()
1248            .unwrap();
1249
1250        assert_eq!(definition.route_id(), "my-route");
1251    }
1252
1253    #[test]
1254    fn test_build_without_route_id_fails() {
1255        let result = RouteBuilder::from("timer:tick?period=1000")
1256            .to("log:info")
1257            .build();
1258        let err = match result {
1259            Err(e) => e.to_string(),
1260            Ok(_) => panic!("build() should fail without route_id"),
1261        };
1262        assert!(
1263            err.contains("route_id"),
1264            "error should mention route_id, got: {}",
1265            err
1266        );
1267    }
1268
1269    #[test]
1270    fn test_builder_auto_startup_false() {
1271        let definition = RouteBuilder::from("timer:tick")
1272            .route_id("test-route")
1273            .auto_startup(false)
1274            .build()
1275            .unwrap();
1276
1277        assert!(!definition.auto_startup());
1278    }
1279
1280    #[test]
1281    fn test_builder_startup_order_custom() {
1282        let definition = RouteBuilder::from("timer:tick")
1283            .route_id("test-route")
1284            .startup_order(50)
1285            .build()
1286            .unwrap();
1287
1288        assert_eq!(definition.startup_order(), 50);
1289    }
1290
1291    #[test]
1292    fn test_builder_defaults() {
1293        let definition = RouteBuilder::from("timer:tick")
1294            .route_id("test-route")
1295            .build()
1296            .unwrap();
1297
1298        assert_eq!(definition.route_id(), "test-route");
1299        assert!(definition.auto_startup());
1300        assert_eq!(definition.startup_order(), 1000);
1301    }
1302
1303    // ── Choice typestate tests ──────────────────────────────────────────────────
1304
1305    #[test]
1306    fn test_choice_builder_single_when() {
1307        let definition = RouteBuilder::from("timer:tick")
1308            .route_id("test-route")
1309            .choice()
1310            .when(|ex: &Exchange| ex.input.header("type").is_some())
1311            .to("mock:typed")
1312            .end_when()
1313            .end_choice()
1314            .build()
1315            .unwrap();
1316        assert_eq!(definition.steps().len(), 1);
1317        assert!(
1318            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1319            if whens.len() == 1 && otherwise.is_none())
1320        );
1321    }
1322
1323    #[test]
1324    fn test_choice_builder_when_otherwise() {
1325        let definition = RouteBuilder::from("timer:tick")
1326            .route_id("test-route")
1327            .choice()
1328            .when(|ex: &Exchange| ex.input.header("a").is_some())
1329            .to("mock:a")
1330            .end_when()
1331            .otherwise()
1332            .to("mock:fallback")
1333            .end_otherwise()
1334            .end_choice()
1335            .build()
1336            .unwrap();
1337        assert!(
1338            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1339            if whens.len() == 1 && otherwise.is_some())
1340        );
1341    }
1342
1343    #[test]
1344    fn test_choice_builder_multiple_whens() {
1345        let definition = RouteBuilder::from("timer:tick")
1346            .route_id("test-route")
1347            .choice()
1348            .when(|ex: &Exchange| ex.input.header("a").is_some())
1349            .to("mock:a")
1350            .end_when()
1351            .when(|ex: &Exchange| ex.input.header("b").is_some())
1352            .to("mock:b")
1353            .end_when()
1354            .end_choice()
1355            .build()
1356            .unwrap();
1357        assert!(
1358            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1359            if whens.len() == 2)
1360        );
1361    }
1362
1363    #[test]
1364    fn test_choice_step_after_choice() {
1365        // Steps after end_choice() are added to the outer pipeline, not inside choice.
1366        let definition = RouteBuilder::from("timer:tick")
1367            .route_id("test-route")
1368            .choice()
1369            .when(|_ex: &Exchange| true)
1370            .to("mock:inner")
1371            .end_when()
1372            .end_choice()
1373            .to("mock:outer") // must be step[1], not inside choice
1374            .build()
1375            .unwrap();
1376        assert_eq!(definition.steps().len(), 2);
1377        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1378    }
1379}