Skip to main content

camel_core/
route.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12    MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16/// A Route defines a message flow: from a source endpoint, through a composed
17/// Tower Service pipeline.
18pub struct Route {
19    /// The source endpoint URI.
20    pub(crate) from_uri: String,
21    /// The composed processor pipeline as a type-erased Tower Service.
22    pub(crate) pipeline: BoxProcessor,
23    /// Optional per-route concurrency model override.
24    /// When `None`, the consumer's default concurrency model is used.
25    pub(crate) concurrency: Option<ConcurrencyModel>,
26}
27
28impl Route {
29    /// Create a new route from the given source URI and processor pipeline.
30    pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
31        Self {
32            from_uri: from_uri.into(),
33            pipeline,
34            concurrency: None,
35        }
36    }
37
38    /// The source endpoint URI.
39    pub fn from_uri(&self) -> &str {
40        &self.from_uri
41    }
42
43    /// Consume the route and return its pipeline.
44    pub fn into_pipeline(self) -> BoxProcessor {
45        self.pipeline
46    }
47
48    /// Set a concurrency model override for this route.
49    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
50        self.concurrency = Some(model);
51        self
52    }
53
54    /// Get the concurrency model override, if any.
55    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
56        self.concurrency.as_ref()
57    }
58
59    /// Consume the route, returning the pipeline and optional concurrency override.
60    pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
61        (self.pipeline, self.concurrency)
62    }
63}
64
65/// A step in an unresolved route definition.
66pub enum BuilderStep {
67    /// A pre-built Tower processor service.
68    Processor(BoxProcessor),
69    /// A destination URI — resolved at start time by CamelContext.
70    To(String),
71    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
72    Split {
73        config: SplitterConfig,
74        steps: Vec<BuilderStep>,
75    },
76    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
77    Aggregate { config: AggregatorConfig },
78    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
79    Filter {
80        predicate: FilterPredicate,
81        steps: Vec<BuilderStep>,
82    },
83    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
84    WireTap { uri: String },
85    /// A Multicast step: sends the same exchange to multiple destinations.
86    Multicast {
87        steps: Vec<BuilderStep>,
88        config: MulticastConfig,
89    },
90}
91
92impl std::fmt::Debug for BuilderStep {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        match self {
95            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
96            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
97            BuilderStep::Split { steps, .. } => {
98                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
99            }
100            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
101            BuilderStep::Filter { steps, .. } => {
102                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
103            }
104            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
105            BuilderStep::Multicast { steps, .. } => {
106                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
107            }
108        }
109    }
110}
111
112/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
113pub struct RouteDefinition {
114    pub(crate) from_uri: String,
115    pub(crate) steps: Vec<BuilderStep>,
116    /// Optional per-route error handler config. Takes precedence over the global one.
117    pub(crate) error_handler: Option<ErrorHandlerConfig>,
118    /// Optional circuit breaker config. Applied between error handler and step pipeline.
119    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
120    /// User override for the consumer's concurrency model. `None` means
121    /// "use whatever the consumer declares".
122    pub(crate) concurrency: Option<ConcurrencyModel>,
123    /// Optional unique identifier for this route. If not set, one will be generated.
124    pub(crate) route_id: Option<String>,
125    /// Whether this route should start automatically when the context starts.
126    pub(crate) auto_startup: bool,
127    /// Order in which routes are started. Lower values start first.
128    pub(crate) startup_order: i32,
129}
130
131impl RouteDefinition {
132    /// Create a new route definition.
133    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
134        Self {
135            from_uri: from_uri.into(),
136            steps,
137            error_handler: None,
138            circuit_breaker: None,
139            concurrency: None,
140            route_id: None,
141            auto_startup: true,
142            startup_order: 1000,
143        }
144    }
145
146    /// The source endpoint URI.
147    pub fn from_uri(&self) -> &str {
148        &self.from_uri
149    }
150
151    /// The steps in this route definition.
152    pub fn steps(&self) -> &[BuilderStep] {
153        &self.steps
154    }
155
156    /// Set a per-route error handler, overriding the global one.
157    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
158        self.error_handler = Some(config);
159        self
160    }
161
162    /// Set a circuit breaker for this route.
163    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
164        self.circuit_breaker = Some(config);
165        self
166    }
167
168    /// Get the circuit breaker config, if set.
169    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
170        self.circuit_breaker.as_ref()
171    }
172
173    /// User-specified concurrency override, if any.
174    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
175        self.concurrency.as_ref()
176    }
177
178    /// Override the consumer's concurrency model for this route.
179    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
180        self.concurrency = Some(model);
181        self
182    }
183
184    /// Get the route ID, if explicitly set.
185    pub fn route_id(&self) -> Option<&str> {
186        self.route_id.as_deref()
187    }
188
189    /// Whether this route should start automatically when the context starts.
190    pub fn auto_startup(&self) -> bool {
191        self.auto_startup
192    }
193
194    /// Order in which routes are started. Lower values start first.
195    pub fn startup_order(&self) -> i32 {
196        self.startup_order
197    }
198
199    /// Set a unique identifier for this route.
200    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
201        self.route_id = Some(id.into());
202        self
203    }
204
205    /// Set whether this route should start automatically.
206    pub fn with_auto_startup(mut self, auto: bool) -> Self {
207        self.auto_startup = auto;
208        self
209    }
210
211    /// Set the startup order. Lower values start first.
212    pub fn with_startup_order(mut self, order: i32) -> Self {
213        self.startup_order = order;
214        self
215    }
216
217    /// Extract the metadata fields needed for introspection.
218    /// This is used by RouteController to store route info without the non-Sync steps.
219    pub fn to_info(&self) -> RouteDefinitionInfo {
220        RouteDefinitionInfo {
221            route_id: self.route_id.clone(),
222            auto_startup: self.auto_startup,
223            startup_order: self.startup_order,
224        }
225    }
226}
227
228/// Minimal route definition metadata for introspection.
229///
230/// This struct contains only the metadata fields from [`RouteDefinition`]
231/// that are needed for route lifecycle management, without the `steps` field
232/// (which contains non-Sync types and cannot be stored in a Sync struct).
233#[derive(Clone)]
234pub struct RouteDefinitionInfo {
235    route_id: Option<String>,
236    auto_startup: bool,
237    startup_order: i32,
238}
239
240impl RouteDefinitionInfo {
241    /// Get the route ID, if explicitly set.
242    pub fn route_id(&self) -> Option<&str> {
243        self.route_id.as_deref()
244    }
245
246    /// Whether this route should start automatically when the context starts.
247    pub fn auto_startup(&self) -> bool {
248        self.auto_startup
249    }
250
251    /// Order in which routes are started. Lower values start first.
252    pub fn startup_order(&self) -> i32 {
253        self.startup_order
254    }
255}
256
257/// Compose a list of BoxProcessors into a single pipeline that runs them sequentially.
258pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
259    if processors.is_empty() {
260        return BoxProcessor::new(IdentityProcessor);
261    }
262    BoxProcessor::new(SequentialPipeline { steps: processors })
263}
264
265/// A service that executes a sequence of BoxProcessors in order.
266#[derive(Clone)]
267struct SequentialPipeline {
268    steps: Vec<BoxProcessor>,
269}
270
271impl Service<Exchange> for SequentialPipeline {
272    type Response = Exchange;
273    type Error = CamelError;
274    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
275
276    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277        if let Some(first) = self.steps.first_mut() {
278            first.poll_ready(cx)
279        } else {
280            Poll::Ready(Ok(()))
281        }
282    }
283
284    fn call(&mut self, exchange: Exchange) -> Self::Future {
285        let mut steps = self.steps.clone();
286        Box::pin(async move {
287            let mut ex = exchange;
288            for step in &mut steps {
289                ex = step.ready().await?.call(ex).await?;
290            }
291            Ok(ex)
292        })
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use camel_api::BoxProcessorExt;
300    use std::sync::Arc;
301    use std::sync::atomic::{AtomicBool, Ordering};
302
303    /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
304    #[derive(Clone)]
305    struct DelayedReadyService {
306        ready: Arc<AtomicBool>,
307    }
308
309    impl Service<Exchange> for DelayedReadyService {
310        type Response = Exchange;
311        type Error = CamelError;
312        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
313
314        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
315            if self.ready.fetch_or(true, Ordering::SeqCst) {
316                // Already marked ready (second+ call) → Ready
317                Poll::Ready(Ok(()))
318            } else {
319                // First call → Pending, schedule a wake
320                cx.waker().wake_by_ref();
321                Poll::Pending
322            }
323        }
324
325        fn call(&mut self, ex: Exchange) -> Self::Future {
326            Box::pin(async move { Ok(ex) })
327        }
328    }
329
330    #[test]
331    fn test_pipeline_poll_ready_delegates_to_first_step() {
332        let waker = futures::task::noop_waker();
333        let mut cx = Context::from_waker(&waker);
334
335        let inner = DelayedReadyService {
336            ready: Arc::new(AtomicBool::new(false)),
337        };
338        let boxed = BoxProcessor::new(inner);
339        let mut pipeline = SequentialPipeline { steps: vec![boxed] };
340
341        // First poll_ready: inner returns Pending, so pipeline must too.
342        let first = pipeline.poll_ready(&mut cx);
343        assert!(first.is_pending(), "expected Pending on first poll_ready");
344
345        // Second poll_ready: inner returns Ready, so pipeline must too.
346        let second = pipeline.poll_ready(&mut cx);
347        assert!(second.is_ready(), "expected Ready on second poll_ready");
348    }
349
350    #[test]
351    fn test_pipeline_poll_ready_with_empty_steps() {
352        let waker = futures::task::noop_waker();
353        let mut cx = Context::from_waker(&waker);
354
355        let mut pipeline = SequentialPipeline { steps: vec![] };
356
357        // Empty pipeline should be immediately ready.
358        let result = pipeline.poll_ready(&mut cx);
359        assert!(result.is_ready(), "expected Ready for empty pipeline");
360    }
361
362    // When a step in the pipeline returns Err(CamelError::Stopped), the pipeline
363    // should halt further steps and propagate Err(Stopped). The context loop is
364    // responsible for silencing Stopped (treating it as a graceful halt, not an error).
365    #[tokio::test]
366    async fn test_pipeline_stops_gracefully_on_stopped_error() {
367        use std::sync::{
368            Arc,
369            atomic::{AtomicBool, Ordering},
370        };
371
372        // A flag to detect if a step AFTER stop() was called.
373        let after_called = Arc::new(AtomicBool::new(false));
374        let after_called_clone = after_called.clone();
375
376        let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
377        let after_step = BoxProcessor::from_fn(move |ex| {
378            after_called_clone.store(true, Ordering::SeqCst);
379            Box::pin(async move { Ok(ex) })
380        });
381
382        let mut pipeline = SequentialPipeline {
383            steps: vec![stop_step, after_step],
384        };
385
386        let ex = Exchange::new(camel_api::Message::new("hello"));
387        let result = pipeline.call(ex).await;
388
389        // Pipeline propagates Stopped — callers (context loop) are responsible for silencing it.
390        assert!(
391            matches!(result, Err(CamelError::Stopped)),
392            "expected Err(Stopped), got: {:?}",
393            result
394        );
395        // The step after stop must NOT have been called.
396        assert!(
397            !after_called.load(Ordering::SeqCst),
398            "step after stop should not be called"
399        );
400    }
401
402    #[test]
403    fn test_builder_step_multicast_variant() {
404        use camel_api::MulticastConfig;
405
406        let step = BuilderStep::Multicast {
407            steps: vec![BuilderStep::To("direct:a".into())],
408            config: MulticastConfig::new(),
409        };
410
411        assert!(matches!(step, BuilderStep::Multicast { .. }));
412    }
413
414    #[test]
415    fn test_route_definition_defaults() {
416        let def = RouteDefinition::new("direct:test", vec![]);
417        assert_eq!(def.route_id(), None);
418        assert!(def.auto_startup());
419        assert_eq!(def.startup_order(), 1000);
420    }
421
422    #[test]
423    fn test_route_definition_builders() {
424        let def = RouteDefinition::new("direct:test", vec![])
425            .with_route_id("my-route")
426            .with_auto_startup(false)
427            .with_startup_order(50);
428        assert_eq!(def.route_id(), Some("my-route"));
429        assert!(!def.auto_startup());
430        assert_eq!(def.startup_order(), 50);
431    }
432}