Skip to main content

camel_core/
route.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12    MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16/// A Route defines a message flow: from a source endpoint, through a composed
17/// Tower Service pipeline.
18pub struct Route {
19    /// The source endpoint URI.
20    pub(crate) from_uri: String,
21    /// The composed processor pipeline as a type-erased Tower Service.
22    pub(crate) pipeline: BoxProcessor,
23    /// Optional per-route concurrency model override.
24    /// When `None`, the consumer's default concurrency model is used.
25    pub(crate) concurrency: Option<ConcurrencyModel>,
26}
27
28impl Route {
29    /// Create a new route from the given source URI and processor pipeline.
30    pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
31        Self {
32            from_uri: from_uri.into(),
33            pipeline,
34            concurrency: None,
35        }
36    }
37
38    /// The source endpoint URI.
39    pub fn from_uri(&self) -> &str {
40        &self.from_uri
41    }
42
43    /// Consume the route and return its pipeline.
44    pub fn into_pipeline(self) -> BoxProcessor {
45        self.pipeline
46    }
47
48    /// Set a concurrency model override for this route.
49    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
50        self.concurrency = Some(model);
51        self
52    }
53
54    /// Get the concurrency model override, if any.
55    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
56        self.concurrency.as_ref()
57    }
58
59    /// Consume the route, returning the pipeline and optional concurrency override.
60    pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
61        (self.pipeline, self.concurrency)
62    }
63}
64
65/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
66pub struct WhenStep {
67    pub predicate: FilterPredicate,
68    pub steps: Vec<BuilderStep>,
69}
70
71/// A step in an unresolved route definition.
72pub enum BuilderStep {
73    /// A pre-built Tower processor service.
74    Processor(BoxProcessor),
75    /// A destination URI — resolved at start time by CamelContext.
76    To(String),
77    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
78    Split {
79        config: SplitterConfig,
80        steps: Vec<BuilderStep>,
81    },
82    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
83    Aggregate { config: AggregatorConfig },
84    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
85    Filter {
86        predicate: FilterPredicate,
87        steps: Vec<BuilderStep>,
88    },
89    /// A Choice step: evaluates when-clauses in order, routes to the first match.
90    /// If no when matches, the optional otherwise branch is used.
91    Choice {
92        whens: Vec<WhenStep>,
93        otherwise: Option<Vec<BuilderStep>>,
94    },
95    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
96    WireTap { uri: String },
97    /// A Multicast step: sends the same exchange to multiple destinations.
98    Multicast {
99        steps: Vec<BuilderStep>,
100        config: MulticastConfig,
101    },
102}
103
104impl std::fmt::Debug for BuilderStep {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
108            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
109            BuilderStep::Split { steps, .. } => {
110                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
111            }
112            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
113            BuilderStep::Filter { steps, .. } => {
114                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
115            }
116            BuilderStep::Choice { whens, otherwise } => {
117                write!(
118                    f,
119                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
120                    whens.len(),
121                    if otherwise.is_some() { "Some" } else { "None" }
122                )
123            }
124            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
125            BuilderStep::Multicast { steps, .. } => {
126                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
127            }
128        }
129    }
130}
131
132/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
133pub struct RouteDefinition {
134    pub(crate) from_uri: String,
135    pub(crate) steps: Vec<BuilderStep>,
136    /// Optional per-route error handler config. Takes precedence over the global one.
137    pub(crate) error_handler: Option<ErrorHandlerConfig>,
138    /// Optional circuit breaker config. Applied between error handler and step pipeline.
139    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
140    /// User override for the consumer's concurrency model. `None` means
141    /// "use whatever the consumer declares".
142    pub(crate) concurrency: Option<ConcurrencyModel>,
143    /// Unique identifier for this route. Required.
144    pub(crate) route_id: String,
145    /// Whether this route should start automatically when the context starts.
146    pub(crate) auto_startup: bool,
147    /// Order in which routes are started. Lower values start first.
148    pub(crate) startup_order: i32,
149}
150
151impl RouteDefinition {
152    /// Create a new route definition with the required route ID.
153    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
154        Self {
155            from_uri: from_uri.into(),
156            steps,
157            error_handler: None,
158            circuit_breaker: None,
159            concurrency: None,
160            route_id: String::new(), // Will be set by with_route_id()
161            auto_startup: true,
162            startup_order: 1000,
163        }
164    }
165
166    /// The source endpoint URI.
167    pub fn from_uri(&self) -> &str {
168        &self.from_uri
169    }
170
171    /// The steps in this route definition.
172    pub fn steps(&self) -> &[BuilderStep] {
173        &self.steps
174    }
175
176    /// Set a per-route error handler, overriding the global one.
177    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
178        self.error_handler = Some(config);
179        self
180    }
181
182    /// Set a circuit breaker for this route.
183    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
184        self.circuit_breaker = Some(config);
185        self
186    }
187
188    /// Get the circuit breaker config, if set.
189    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
190        self.circuit_breaker.as_ref()
191    }
192
193    /// User-specified concurrency override, if any.
194    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
195        self.concurrency.as_ref()
196    }
197
198    /// Override the consumer's concurrency model for this route.
199    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
200        self.concurrency = Some(model);
201        self
202    }
203
204    /// Get the route ID.
205    pub fn route_id(&self) -> &str {
206        &self.route_id
207    }
208
209    /// Whether this route should start automatically when the context starts.
210    pub fn auto_startup(&self) -> bool {
211        self.auto_startup
212    }
213
214    /// Order in which routes are started. Lower values start first.
215    pub fn startup_order(&self) -> i32 {
216        self.startup_order
217    }
218
219    /// Set a unique identifier for this route.
220    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
221        self.route_id = id.into();
222        self
223    }
224
225    /// Set whether this route should start automatically.
226    pub fn with_auto_startup(mut self, auto: bool) -> Self {
227        self.auto_startup = auto;
228        self
229    }
230
231    /// Set the startup order. Lower values start first.
232    pub fn with_startup_order(mut self, order: i32) -> Self {
233        self.startup_order = order;
234        self
235    }
236
237    /// Extract the metadata fields needed for introspection.
238    /// This is used by RouteController to store route info without the non-Sync steps.
239    pub fn to_info(&self) -> RouteDefinitionInfo {
240        RouteDefinitionInfo {
241            route_id: self.route_id.clone(),
242            auto_startup: self.auto_startup,
243            startup_order: self.startup_order,
244        }
245    }
246}
247
248/// Minimal route definition metadata for introspection.
249///
250/// This struct contains only the metadata fields from [`RouteDefinition`]
251/// that are needed for route lifecycle management, without the `steps` field
252/// (which contains non-Sync types and cannot be stored in a Sync struct).
253#[derive(Clone)]
254pub struct RouteDefinitionInfo {
255    route_id: String,
256    auto_startup: bool,
257    startup_order: i32,
258}
259
260impl RouteDefinitionInfo {
261    /// Get the route ID.
262    pub fn route_id(&self) -> &str {
263        &self.route_id
264    }
265
266    /// Whether this route should start automatically when the context starts.
267    pub fn auto_startup(&self) -> bool {
268        self.auto_startup
269    }
270
271    /// Order in which routes are started. Lower values start first.
272    pub fn startup_order(&self) -> i32 {
273        self.startup_order
274    }
275}
276
277/// Compose a list of BoxProcessors into a single pipeline that runs them sequentially.
278pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
279    if processors.is_empty() {
280        return BoxProcessor::new(IdentityProcessor);
281    }
282    BoxProcessor::new(SequentialPipeline { steps: processors })
283}
284
285/// A service that executes a sequence of BoxProcessors in order.
286#[derive(Clone)]
287struct SequentialPipeline {
288    steps: Vec<BoxProcessor>,
289}
290
291impl Service<Exchange> for SequentialPipeline {
292    type Response = Exchange;
293    type Error = CamelError;
294    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
295
296    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
297        if let Some(first) = self.steps.first_mut() {
298            first.poll_ready(cx)
299        } else {
300            Poll::Ready(Ok(()))
301        }
302    }
303
304    fn call(&mut self, exchange: Exchange) -> Self::Future {
305        let mut steps = self.steps.clone();
306        Box::pin(async move {
307            let mut ex = exchange;
308            for step in &mut steps {
309                ex = step.ready().await?.call(ex).await?;
310            }
311            Ok(ex)
312        })
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use camel_api::BoxProcessorExt;
320    use std::sync::Arc;
321    use std::sync::atomic::{AtomicBool, Ordering};
322
323    /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
324    #[derive(Clone)]
325    struct DelayedReadyService {
326        ready: Arc<AtomicBool>,
327    }
328
329    impl Service<Exchange> for DelayedReadyService {
330        type Response = Exchange;
331        type Error = CamelError;
332        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
333
334        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
335            if self.ready.fetch_or(true, Ordering::SeqCst) {
336                // Already marked ready (second+ call) → Ready
337                Poll::Ready(Ok(()))
338            } else {
339                // First call → Pending, schedule a wake
340                cx.waker().wake_by_ref();
341                Poll::Pending
342            }
343        }
344
345        fn call(&mut self, ex: Exchange) -> Self::Future {
346            Box::pin(async move { Ok(ex) })
347        }
348    }
349
350    #[test]
351    fn test_pipeline_poll_ready_delegates_to_first_step() {
352        let waker = futures::task::noop_waker();
353        let mut cx = Context::from_waker(&waker);
354
355        let inner = DelayedReadyService {
356            ready: Arc::new(AtomicBool::new(false)),
357        };
358        let boxed = BoxProcessor::new(inner);
359        let mut pipeline = SequentialPipeline { steps: vec![boxed] };
360
361        // First poll_ready: inner returns Pending, so pipeline must too.
362        let first = pipeline.poll_ready(&mut cx);
363        assert!(first.is_pending(), "expected Pending on first poll_ready");
364
365        // Second poll_ready: inner returns Ready, so pipeline must too.
366        let second = pipeline.poll_ready(&mut cx);
367        assert!(second.is_ready(), "expected Ready on second poll_ready");
368    }
369
370    #[test]
371    fn test_pipeline_poll_ready_with_empty_steps() {
372        let waker = futures::task::noop_waker();
373        let mut cx = Context::from_waker(&waker);
374
375        let mut pipeline = SequentialPipeline { steps: vec![] };
376
377        // Empty pipeline should be immediately ready.
378        let result = pipeline.poll_ready(&mut cx);
379        assert!(result.is_ready(), "expected Ready for empty pipeline");
380    }
381
382    // When a step in the pipeline returns Err(CamelError::Stopped), the pipeline
383    // should halt further steps and propagate Err(Stopped). The context loop is
384    // responsible for silencing Stopped (treating it as a graceful halt, not an error).
385    #[tokio::test]
386    async fn test_pipeline_stops_gracefully_on_stopped_error() {
387        use std::sync::{
388            Arc,
389            atomic::{AtomicBool, Ordering},
390        };
391
392        // A flag to detect if a step AFTER stop() was called.
393        let after_called = Arc::new(AtomicBool::new(false));
394        let after_called_clone = after_called.clone();
395
396        let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
397        let after_step = BoxProcessor::from_fn(move |ex| {
398            after_called_clone.store(true, Ordering::SeqCst);
399            Box::pin(async move { Ok(ex) })
400        });
401
402        let mut pipeline = SequentialPipeline {
403            steps: vec![stop_step, after_step],
404        };
405
406        let ex = Exchange::new(camel_api::Message::new("hello"));
407        let result = pipeline.call(ex).await;
408
409        // Pipeline propagates Stopped — callers (context loop) are responsible for silencing it.
410        assert!(
411            matches!(result, Err(CamelError::Stopped)),
412            "expected Err(Stopped), got: {:?}",
413            result
414        );
415        // The step after stop must NOT have been called.
416        assert!(
417            !after_called.load(Ordering::SeqCst),
418            "step after stop should not be called"
419        );
420    }
421
422    #[test]
423    fn test_builder_step_multicast_variant() {
424        use camel_api::MulticastConfig;
425
426        let step = BuilderStep::Multicast {
427            steps: vec![BuilderStep::To("direct:a".into())],
428            config: MulticastConfig::new(),
429        };
430
431        assert!(matches!(step, BuilderStep::Multicast { .. }));
432    }
433
434    #[test]
435    fn test_route_definition_defaults() {
436        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
437        assert_eq!(def.route_id(), "test-route");
438        assert!(def.auto_startup());
439        assert_eq!(def.startup_order(), 1000);
440    }
441
442    #[test]
443    fn test_route_definition_builders() {
444        let def = RouteDefinition::new("direct:test", vec![])
445            .with_route_id("my-route")
446            .with_auto_startup(false)
447            .with_startup_order(50);
448        assert_eq!(def.route_id(), "my-route");
449        assert!(!def.auto_startup());
450        assert_eq!(def.startup_order(), 50);
451    }
452
453    #[test]
454    fn test_choice_builder_step_debug() {
455        use camel_api::{Exchange, FilterPredicate};
456        use std::sync::Arc;
457
458        fn always_true(_: &Exchange) -> bool {
459            true
460        }
461
462        let step = BuilderStep::Choice {
463            whens: vec![crate::route::WhenStep {
464                predicate: Arc::new(always_true) as FilterPredicate,
465                steps: vec![BuilderStep::To("mock:a".into())],
466            }],
467            otherwise: None,
468        };
469        let debug = format!("{step:?}");
470        assert!(debug.contains("Choice"));
471    }
472}