1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12 MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16pub struct Route {
19 pub(crate) from_uri: String,
21 pub(crate) pipeline: BoxProcessor,
23 pub(crate) concurrency: Option<ConcurrencyModel>,
26}
27
28impl Route {
29 pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
31 Self {
32 from_uri: from_uri.into(),
33 pipeline,
34 concurrency: None,
35 }
36 }
37
38 pub fn from_uri(&self) -> &str {
40 &self.from_uri
41 }
42
43 pub fn into_pipeline(self) -> BoxProcessor {
45 self.pipeline
46 }
47
48 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
50 self.concurrency = Some(model);
51 self
52 }
53
54 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
56 self.concurrency.as_ref()
57 }
58
59 pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
61 (self.pipeline, self.concurrency)
62 }
63}
64
65pub struct WhenStep {
67 pub predicate: FilterPredicate,
68 pub steps: Vec<BuilderStep>,
69}
70
71pub enum BuilderStep {
73 Processor(BoxProcessor),
75 To(String),
77 Split {
79 config: SplitterConfig,
80 steps: Vec<BuilderStep>,
81 },
82 Aggregate { config: AggregatorConfig },
84 Filter {
86 predicate: FilterPredicate,
87 steps: Vec<BuilderStep>,
88 },
89 Choice {
92 whens: Vec<WhenStep>,
93 otherwise: Option<Vec<BuilderStep>>,
94 },
95 WireTap { uri: String },
97 Multicast {
99 steps: Vec<BuilderStep>,
100 config: MulticastConfig,
101 },
102}
103
104impl std::fmt::Debug for BuilderStep {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
108 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
109 BuilderStep::Split { steps, .. } => {
110 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
111 }
112 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
113 BuilderStep::Filter { steps, .. } => {
114 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
115 }
116 BuilderStep::Choice { whens, otherwise } => {
117 write!(
118 f,
119 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
120 whens.len(),
121 if otherwise.is_some() { "Some" } else { "None" }
122 )
123 }
124 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
125 BuilderStep::Multicast { steps, .. } => {
126 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
127 }
128 }
129 }
130}
131
132pub struct RouteDefinition {
134 pub(crate) from_uri: String,
135 pub(crate) steps: Vec<BuilderStep>,
136 pub(crate) error_handler: Option<ErrorHandlerConfig>,
138 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
140 pub(crate) concurrency: Option<ConcurrencyModel>,
143 pub(crate) route_id: String,
145 pub(crate) auto_startup: bool,
147 pub(crate) startup_order: i32,
149}
150
151impl RouteDefinition {
152 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
154 Self {
155 from_uri: from_uri.into(),
156 steps,
157 error_handler: None,
158 circuit_breaker: None,
159 concurrency: None,
160 route_id: String::new(), auto_startup: true,
162 startup_order: 1000,
163 }
164 }
165
166 pub fn from_uri(&self) -> &str {
168 &self.from_uri
169 }
170
171 pub fn steps(&self) -> &[BuilderStep] {
173 &self.steps
174 }
175
176 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
178 self.error_handler = Some(config);
179 self
180 }
181
182 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
184 self.circuit_breaker = Some(config);
185 self
186 }
187
188 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
190 self.circuit_breaker.as_ref()
191 }
192
193 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
195 self.concurrency.as_ref()
196 }
197
198 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
200 self.concurrency = Some(model);
201 self
202 }
203
204 pub fn route_id(&self) -> &str {
206 &self.route_id
207 }
208
209 pub fn auto_startup(&self) -> bool {
211 self.auto_startup
212 }
213
214 pub fn startup_order(&self) -> i32 {
216 self.startup_order
217 }
218
219 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
221 self.route_id = id.into();
222 self
223 }
224
225 pub fn with_auto_startup(mut self, auto: bool) -> Self {
227 self.auto_startup = auto;
228 self
229 }
230
231 pub fn with_startup_order(mut self, order: i32) -> Self {
233 self.startup_order = order;
234 self
235 }
236
237 pub fn to_info(&self) -> RouteDefinitionInfo {
240 RouteDefinitionInfo {
241 route_id: self.route_id.clone(),
242 auto_startup: self.auto_startup,
243 startup_order: self.startup_order,
244 }
245 }
246}
247
248#[derive(Clone)]
254pub struct RouteDefinitionInfo {
255 route_id: String,
256 auto_startup: bool,
257 startup_order: i32,
258}
259
260impl RouteDefinitionInfo {
261 pub fn route_id(&self) -> &str {
263 &self.route_id
264 }
265
266 pub fn auto_startup(&self) -> bool {
268 self.auto_startup
269 }
270
271 pub fn startup_order(&self) -> i32 {
273 self.startup_order
274 }
275}
276
277pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
279 if processors.is_empty() {
280 return BoxProcessor::new(IdentityProcessor);
281 }
282 BoxProcessor::new(SequentialPipeline { steps: processors })
283}
284
285#[derive(Clone)]
287struct SequentialPipeline {
288 steps: Vec<BoxProcessor>,
289}
290
291impl Service<Exchange> for SequentialPipeline {
292 type Response = Exchange;
293 type Error = CamelError;
294 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
295
296 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
297 if let Some(first) = self.steps.first_mut() {
298 first.poll_ready(cx)
299 } else {
300 Poll::Ready(Ok(()))
301 }
302 }
303
304 fn call(&mut self, exchange: Exchange) -> Self::Future {
305 let mut steps = self.steps.clone();
306 Box::pin(async move {
307 let mut ex = exchange;
308 for step in &mut steps {
309 ex = step.ready().await?.call(ex).await?;
310 }
311 Ok(ex)
312 })
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use camel_api::BoxProcessorExt;
320 use std::sync::Arc;
321 use std::sync::atomic::{AtomicBool, Ordering};
322
323 #[derive(Clone)]
325 struct DelayedReadyService {
326 ready: Arc<AtomicBool>,
327 }
328
329 impl Service<Exchange> for DelayedReadyService {
330 type Response = Exchange;
331 type Error = CamelError;
332 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
333
334 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
335 if self.ready.fetch_or(true, Ordering::SeqCst) {
336 Poll::Ready(Ok(()))
338 } else {
339 cx.waker().wake_by_ref();
341 Poll::Pending
342 }
343 }
344
345 fn call(&mut self, ex: Exchange) -> Self::Future {
346 Box::pin(async move { Ok(ex) })
347 }
348 }
349
350 #[test]
351 fn test_pipeline_poll_ready_delegates_to_first_step() {
352 let waker = futures::task::noop_waker();
353 let mut cx = Context::from_waker(&waker);
354
355 let inner = DelayedReadyService {
356 ready: Arc::new(AtomicBool::new(false)),
357 };
358 let boxed = BoxProcessor::new(inner);
359 let mut pipeline = SequentialPipeline { steps: vec![boxed] };
360
361 let first = pipeline.poll_ready(&mut cx);
363 assert!(first.is_pending(), "expected Pending on first poll_ready");
364
365 let second = pipeline.poll_ready(&mut cx);
367 assert!(second.is_ready(), "expected Ready on second poll_ready");
368 }
369
370 #[test]
371 fn test_pipeline_poll_ready_with_empty_steps() {
372 let waker = futures::task::noop_waker();
373 let mut cx = Context::from_waker(&waker);
374
375 let mut pipeline = SequentialPipeline { steps: vec![] };
376
377 let result = pipeline.poll_ready(&mut cx);
379 assert!(result.is_ready(), "expected Ready for empty pipeline");
380 }
381
382 #[tokio::test]
386 async fn test_pipeline_stops_gracefully_on_stopped_error() {
387 use std::sync::{
388 Arc,
389 atomic::{AtomicBool, Ordering},
390 };
391
392 let after_called = Arc::new(AtomicBool::new(false));
394 let after_called_clone = after_called.clone();
395
396 let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
397 let after_step = BoxProcessor::from_fn(move |ex| {
398 after_called_clone.store(true, Ordering::SeqCst);
399 Box::pin(async move { Ok(ex) })
400 });
401
402 let mut pipeline = SequentialPipeline {
403 steps: vec![stop_step, after_step],
404 };
405
406 let ex = Exchange::new(camel_api::Message::new("hello"));
407 let result = pipeline.call(ex).await;
408
409 assert!(
411 matches!(result, Err(CamelError::Stopped)),
412 "expected Err(Stopped), got: {:?}",
413 result
414 );
415 assert!(
417 !after_called.load(Ordering::SeqCst),
418 "step after stop should not be called"
419 );
420 }
421
422 #[test]
423 fn test_builder_step_multicast_variant() {
424 use camel_api::MulticastConfig;
425
426 let step = BuilderStep::Multicast {
427 steps: vec![BuilderStep::To("direct:a".into())],
428 config: MulticastConfig::new(),
429 };
430
431 assert!(matches!(step, BuilderStep::Multicast { .. }));
432 }
433
434 #[test]
435 fn test_route_definition_defaults() {
436 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
437 assert_eq!(def.route_id(), "test-route");
438 assert!(def.auto_startup());
439 assert_eq!(def.startup_order(), 1000);
440 }
441
442 #[test]
443 fn test_route_definition_builders() {
444 let def = RouteDefinition::new("direct:test", vec![])
445 .with_route_id("my-route")
446 .with_auto_startup(false)
447 .with_startup_order(50);
448 assert_eq!(def.route_id(), "my-route");
449 assert!(!def.auto_startup());
450 assert_eq!(def.startup_order(), 50);
451 }
452
453 #[test]
454 fn test_choice_builder_step_debug() {
455 use camel_api::{Exchange, FilterPredicate};
456 use std::sync::Arc;
457
458 fn always_true(_: &Exchange) -> bool {
459 true
460 }
461
462 let step = BuilderStep::Choice {
463 whens: vec![crate::route::WhenStep {
464 predicate: Arc::new(always_true) as FilterPredicate,
465 steps: vec![BuilderStep::To("mock:a".into())],
466 }],
467 otherwise: None,
468 };
469 let debug = format!("{step:?}");
470 assert!(debug.contains("Choice"));
471 }
472}