1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{BoxProcessor, CamelError, ProducerContext, RouteController, RouteStatus};
17use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
18use camel_endpoint::parse_uri;
19use camel_processor::circuit_breaker::CircuitBreakerLayer;
20use camel_processor::error_handler::ErrorHandlerLayer;
21
22use crate::registry::Registry;
23use crate::route::{BuilderStep, RouteDefinition, RouteDefinitionInfo, compose_pipeline};
24
25type SyncPipeline = Arc<std::sync::Mutex<BoxProcessor>>;
30
31struct ManagedRoute {
33 definition: RouteDefinitionInfo,
35 from_uri: String,
37 pipeline: SyncPipeline,
39 concurrency: Option<ConcurrencyModel>,
41 status: RouteStatus,
43 consumer_handle: Option<JoinHandle<()>>,
45 pipeline_handle: Option<JoinHandle<()>>,
47 cancel_token: CancellationToken,
49}
50
51async fn ready_with_backoff(
58 pipeline: &mut BoxProcessor,
59 cancel: &CancellationToken,
60) -> Result<(), CamelError> {
61 loop {
62 match pipeline.ready().await {
63 Ok(_) => return Ok(()),
64 Err(CamelError::CircuitOpen(ref msg)) => {
65 warn!("Circuit open, backing off: {msg}");
66 tokio::select! {
67 _ = tokio::time::sleep(Duration::from_secs(1)) => {
68 continue;
69 }
70 _ = cancel.cancelled() => {
71 return Err(CamelError::CircuitOpen(msg.clone()));
73 }
74 }
75 }
76 Err(e) => {
77 error!("Pipeline not ready: {e}");
78 return Err(e);
79 }
80 }
81 }
82}
83
84pub struct DefaultRouteController {
92 routes: HashMap<String, ManagedRoute>,
94 registry: Arc<std::sync::Mutex<Registry>>,
96 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
99 global_error_handler: Option<ErrorHandlerConfig>,
101}
102
103impl DefaultRouteController {
104 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
106 Self {
107 routes: HashMap::new(),
108 registry,
109 self_ref: None,
110 global_error_handler: None,
111 }
112 }
113
114 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
118 self.self_ref = Some(self_ref);
119 }
120
121 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
123 self.global_error_handler = Some(config);
124 }
125
126 fn resolve_error_handler(
128 &self,
129 config: ErrorHandlerConfig,
130 producer_ctx: &ProducerContext,
131 registry: &Registry,
132 ) -> Result<ErrorHandlerLayer, CamelError> {
133 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
135 let parsed = parse_uri(uri)?;
136 let component = registry.get_or_err(&parsed.scheme)?;
137 let endpoint = component.create_endpoint(uri)?;
138 Some(endpoint.create_producer(producer_ctx)?)
139 } else {
140 None
141 };
142
143 let mut resolved_policies = Vec::new();
145 for policy in config.policies {
146 let handler_producer = if let Some(ref uri) = policy.handled_by {
147 let parsed = parse_uri(uri)?;
148 let component = registry.get_or_err(&parsed.scheme)?;
149 let endpoint = component.create_endpoint(uri)?;
150 Some(endpoint.create_producer(producer_ctx)?)
151 } else {
152 None
153 };
154 resolved_policies.push((policy, handler_producer));
155 }
156
157 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
158 }
159
160 fn resolve_steps(
162 &self,
163 steps: Vec<BuilderStep>,
164 producer_ctx: &ProducerContext,
165 registry: &Registry,
166 ) -> Result<Vec<BoxProcessor>, CamelError> {
167 let mut processors: Vec<BoxProcessor> = Vec::new();
168 for step in steps {
169 match step {
170 BuilderStep::Processor(svc) => {
171 processors.push(svc);
172 }
173 BuilderStep::To(uri) => {
174 let parsed = parse_uri(&uri)?;
175 let component = registry.get_or_err(&parsed.scheme)?;
176 let endpoint = component.create_endpoint(&uri)?;
177 let producer = endpoint.create_producer(producer_ctx)?;
178 processors.push(producer);
179 }
180 BuilderStep::Split { config, steps } => {
181 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
182 let sub_pipeline = compose_pipeline(sub_processors);
183 let splitter =
184 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
185 processors.push(BoxProcessor::new(splitter));
186 }
187 BuilderStep::Aggregate { config } => {
188 let svc = camel_processor::AggregatorService::new(config);
189 processors.push(BoxProcessor::new(svc));
190 }
191 BuilderStep::Filter { predicate, steps } => {
192 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
193 let sub_pipeline = compose_pipeline(sub_processors);
194 let svc =
195 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
196 processors.push(BoxProcessor::new(svc));
197 }
198 BuilderStep::WireTap { uri } => {
199 let parsed = parse_uri(&uri)?;
200 let component = registry.get_or_err(&parsed.scheme)?;
201 let endpoint = component.create_endpoint(&uri)?;
202 let producer = endpoint.create_producer(producer_ctx)?;
203 let svc = camel_processor::WireTapService::new(producer);
204 processors.push(BoxProcessor::new(svc));
205 }
206 BuilderStep::Multicast { config, steps } => {
207 let mut endpoints = Vec::new();
209 for step in steps {
210 let sub_processors =
211 self.resolve_steps(vec![step], producer_ctx, registry)?;
212 let endpoint = compose_pipeline(sub_processors);
213 endpoints.push(endpoint);
214 }
215 let svc = camel_processor::MulticastService::new(endpoints, config);
216 processors.push(BoxProcessor::new(svc));
217 }
218 }
219 }
220 Ok(processors)
221 }
222
223 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
234 let route_id = definition
235 .route_id()
236 .ok_or_else(|| CamelError::RouteError("Route must have an ID".into()))?
237 .to_string();
238
239 if self.routes.contains_key(&route_id) {
240 return Err(CamelError::RouteError(format!(
241 "Route '{}' already exists",
242 route_id
243 )));
244 }
245
246 info!(route_id = %route_id, "Adding route to controller");
247
248 let definition_info = definition.to_info();
250 let from_uri = definition.from_uri.to_string();
251 let concurrency = definition.concurrency;
252
253 let producer_ctx = self
255 .self_ref
256 .clone()
257 .map(ProducerContext::new)
258 .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
259
260 let registry = self
262 .registry
263 .lock()
264 .expect("mutex poisoned: another thread panicked while holding this lock");
265
266 let processors = self.resolve_steps(definition.steps, &producer_ctx, ®istry)?;
268 let mut pipeline = compose_pipeline(processors);
269
270 if let Some(cb_config) = definition.circuit_breaker {
272 let cb_layer = CircuitBreakerLayer::new(cb_config);
273 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
274 }
275
276 let eh_config = definition
278 .error_handler
279 .or_else(|| self.global_error_handler.clone());
280
281 if let Some(config) = eh_config {
282 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
283 pipeline = BoxProcessor::new(layer.layer(pipeline));
284 }
285
286 drop(registry);
288
289 self.routes.insert(
290 route_id.clone(),
291 ManagedRoute {
292 definition: definition_info,
293 from_uri,
294 pipeline: Arc::new(std::sync::Mutex::new(pipeline)),
295 concurrency,
296 status: RouteStatus::Stopped,
297 consumer_handle: None,
298 pipeline_handle: None,
299 cancel_token: CancellationToken::new(),
300 },
301 );
302
303 Ok(())
304 }
305
306 pub fn route_count(&self) -> usize {
308 self.routes.len()
309 }
310
311 pub fn route_ids(&self) -> Vec<String> {
313 self.routes.keys().cloned().collect()
314 }
315
316 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
318 let managed = self
319 .routes
320 .get_mut(route_id)
321 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
322
323 let current_status = managed.status.clone();
324 if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
325 return Ok(()); }
327
328 info!(route_id = %route_id, "Stopping route");
329 managed.status = RouteStatus::Stopping;
330
331 managed.cancel_token.cancel();
333
334 let consumer_handle = managed.consumer_handle.take();
336 let pipeline_handle = managed.pipeline_handle.take();
337
338 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
343 match (consumer_handle, pipeline_handle) {
344 (Some(c), Some(p)) => {
345 let _ = tokio::join!(c, p);
346 }
347 (Some(c), None) => {
348 let _ = c.await;
349 }
350 (None, Some(p)) => {
351 let _ = p.await;
352 }
353 (None, None) => {}
354 }
355 })
356 .await;
357
358 if timeout_result.is_err() {
359 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
360 }
361
362 let managed = self
364 .routes
365 .get_mut(route_id)
366 .expect("invariant: route must exist after prior existence check");
367
368 managed.cancel_token = CancellationToken::new();
370 managed.status = RouteStatus::Stopped;
371
372 info!(route_id = %route_id, "Route stopped");
373 Ok(())
374 }
375}
376
377#[async_trait::async_trait]
378impl RouteController for DefaultRouteController {
379 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
380 {
382 let managed = self
383 .routes
384 .get_mut(route_id)
385 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
386
387 match managed.status {
388 RouteStatus::Started => return Ok(()), RouteStatus::Starting => {
390 return Err(CamelError::RouteError(format!(
391 "Route '{}' is already starting",
392 route_id
393 )));
394 }
395 RouteStatus::Stopped | RouteStatus::Failed(_) => {} RouteStatus::Stopping => {
397 return Err(CamelError::RouteError(format!(
398 "Route '{}' is stopping",
399 route_id
400 )));
401 }
402 RouteStatus::Suspended => {} }
404 managed.status = RouteStatus::Starting;
405 }
406
407 info!(route_id = %route_id, "Starting route");
408
409 let (from_uri, pipeline, concurrency) = {
411 let managed = self
412 .routes
413 .get(route_id)
414 .expect("invariant: route must exist after prior existence check");
415 (
416 managed.from_uri.clone(),
417 Arc::clone(&managed.pipeline),
418 managed.concurrency.clone(),
419 )
420 };
421
422 let parsed = parse_uri(&from_uri)?;
424 let registry = self
425 .registry
426 .lock()
427 .expect("mutex poisoned: another thread panicked while holding this lock");
428 let component = registry.get_or_err(&parsed.scheme)?;
429 let endpoint = component.create_endpoint(&from_uri)?;
430 let mut consumer = endpoint.create_consumer()?;
431 let consumer_concurrency = consumer.concurrency_model();
432 drop(registry);
434
435 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
437
438 let managed = self
440 .routes
441 .get_mut(route_id)
442 .expect("invariant: route must exist after prior existence check");
443
444 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
446 let child_token = managed.cancel_token.child_token();
447 let consumer_ctx = ConsumerContext::new(tx, child_token.clone());
448
449 let route_id_for_consumer = route_id.to_string();
452 let consumer_handle = tokio::spawn(async move {
453 if let Err(e) = consumer.start(consumer_ctx).await {
454 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
455 }
456 });
457
458 let pipeline_cancel = child_token;
460 let pipeline_handle = match effective_concurrency {
461 ConcurrencyModel::Sequential => {
462 tokio::spawn(async move {
463 let mut pipeline = pipeline
465 .lock()
466 .expect("mutex poisoned: another thread panicked while holding this lock")
467 .clone();
468 while let Some(envelope) = rx.recv().await {
469 let ExchangeEnvelope { exchange, reply_tx } = envelope;
470
471 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
472 if let Some(tx) = reply_tx {
473 let _ = tx.send(Err(e));
474 }
475 return;
476 }
477
478 let result = pipeline.call(exchange).await;
479 if let Some(tx) = reply_tx {
480 let _ = tx.send(result);
481 } else if let Err(ref e) = result
482 && !matches!(e, CamelError::Stopped)
483 {
484 error!("Pipeline error: {e}");
485 }
486 }
487 })
488 }
489 ConcurrencyModel::Concurrent { max } => {
490 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
491 tokio::spawn(async move {
492 while let Some(envelope) = rx.recv().await {
493 let ExchangeEnvelope { exchange, reply_tx } = envelope;
494 let pipe_clone = Arc::clone(&pipeline);
495 let sem = sem.clone();
496 let cancel = pipeline_cancel.clone();
497 tokio::spawn(async move {
498 let _permit = match &sem {
500 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
501 None => None,
502 };
503
504 let mut pipe = pipe_clone.lock().expect("mutex poisoned: another thread panicked while holding this lock").clone();
506
507 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
509 if let Some(tx) = reply_tx {
510 let _ = tx.send(Err(e));
511 }
512 return;
513 }
514
515 let result = pipe.call(exchange).await;
516 if let Some(tx) = reply_tx {
517 let _ = tx.send(result);
518 } else if let Err(ref e) = result
519 && !matches!(e, CamelError::Stopped)
520 {
521 error!("Pipeline error: {e}");
522 }
523 });
524 }
525 })
526 }
527 };
528
529 let managed = self
531 .routes
532 .get_mut(route_id)
533 .expect("invariant: route must exist after prior existence check");
534 managed.consumer_handle = Some(consumer_handle);
535 managed.pipeline_handle = Some(pipeline_handle);
536 managed.status = RouteStatus::Started;
537
538 info!(route_id = %route_id, "Route started");
539 Ok(())
540 }
541
542 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
543 self.stop_route_internal(route_id).await
544 }
545
546 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
547 self.stop_route(route_id).await?;
548 tokio::time::sleep(Duration::from_millis(100)).await;
549 self.start_route(route_id).await
550 }
551
552 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
553 self.stop_route_internal(route_id).await?;
554 let managed = self
555 .routes
556 .get_mut(route_id)
557 .expect("invariant: route must exist after prior existence check");
558 managed.status = RouteStatus::Suspended;
559 info!(route_id = %route_id, "Route suspended");
560 Ok(())
561 }
562
563 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
564 let is_suspended = self
566 .routes
567 .get(route_id)
568 .map(|r| r.status == RouteStatus::Suspended)
569 .unwrap_or(false);
570
571 if !is_suspended {
572 return Err(CamelError::RouteError(format!(
573 "Route '{}' is not suspended",
574 route_id
575 )));
576 }
577
578 self.start_route(route_id).await
579 }
580
581 fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
582 self.routes.get(route_id).map(|r| r.status.clone())
583 }
584
585 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
586 let route_ids: Vec<String> = {
589 let mut pairs: Vec<_> = self
590 .routes
591 .iter()
592 .filter(|(_, r)| r.definition.auto_startup())
593 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
594 .collect();
595 pairs.sort_by_key(|(_, order)| *order);
596 pairs.into_iter().map(|(id, _)| id).collect()
597 };
598
599 info!("Starting {} auto-startup routes", route_ids.len());
600
601 let mut errors: Vec<String> = Vec::new();
603 for route_id in route_ids {
604 if let Err(e) = self.start_route(&route_id).await {
605 errors.push(format!("Route '{}': {}", route_id, e));
606 }
607 }
608
609 if !errors.is_empty() {
610 return Err(CamelError::RouteError(format!(
611 "Failed to start routes: {}",
612 errors.join(", ")
613 )));
614 }
615
616 info!("All auto-startup routes started");
617 Ok(())
618 }
619
620 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
621 let route_ids: Vec<String> = {
623 let mut pairs: Vec<_> = self
624 .routes
625 .iter()
626 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
627 .collect();
628 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
629 pairs.into_iter().map(|(id, _)| id).collect()
630 };
631
632 info!("Stopping {} routes", route_ids.len());
633
634 for route_id in route_ids {
635 let _ = self.stop_route(&route_id).await;
636 }
637
638 info!("All routes stopped");
639 Ok(())
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646 use std::sync::Arc;
647
648 #[tokio::test]
649 async fn test_add_route_requires_id() {
650 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
651 let mut controller = DefaultRouteController::new(registry);
652
653 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
655 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
656 ));
657 controller.set_self_ref(controller_arc);
658
659 let definition = crate::route::RouteDefinition::new("timer:tick", vec![]);
660 assert!(controller.add_route(definition).is_err());
662 }
663
664 #[tokio::test]
665 async fn test_add_route_with_id_succeeds() {
666 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
667 let mut controller = DefaultRouteController::new(registry);
668
669 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
671 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
672 ));
673 controller.set_self_ref(controller_arc);
674
675 let definition =
676 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
677 assert!(controller.add_route(definition).is_ok());
678 assert_eq!(controller.route_count(), 1);
679 }
680}