1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{BoxProcessor, CamelError, ProducerContext, RouteController, RouteStatus};
17use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
18use camel_endpoint::parse_uri;
19use camel_processor::circuit_breaker::CircuitBreakerLayer;
20use camel_processor::error_handler::ErrorHandlerLayer;
21use camel_processor::{ChoiceService, WhenClause};
22
23use crate::registry::Registry;
24use crate::route::{BuilderStep, RouteDefinition, RouteDefinitionInfo, compose_pipeline};
25use arc_swap::ArcSwap;
26
27pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
42unsafe impl Sync for SyncBoxProcessor {}
43
44type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
45
46struct ManagedRoute {
48 definition: RouteDefinitionInfo,
50 from_uri: String,
52 pipeline: SharedPipeline,
54 concurrency: Option<ConcurrencyModel>,
56 status: RouteStatus,
58 consumer_handle: Option<JoinHandle<()>>,
60 pipeline_handle: Option<JoinHandle<()>>,
62 cancel_token: CancellationToken,
64}
65
66async fn ready_with_backoff(
73 pipeline: &mut BoxProcessor,
74 cancel: &CancellationToken,
75) -> Result<(), CamelError> {
76 loop {
77 match pipeline.ready().await {
78 Ok(_) => return Ok(()),
79 Err(CamelError::CircuitOpen(ref msg)) => {
80 warn!("Circuit open, backing off: {msg}");
81 tokio::select! {
82 _ = tokio::time::sleep(Duration::from_secs(1)) => {
83 continue;
84 }
85 _ = cancel.cancelled() => {
86 return Err(CamelError::CircuitOpen(msg.clone()));
88 }
89 }
90 }
91 Err(e) => {
92 error!("Pipeline not ready: {e}");
93 return Err(e);
94 }
95 }
96 }
97}
98
99pub struct DefaultRouteController {
107 routes: HashMap<String, ManagedRoute>,
109 registry: Arc<std::sync::Mutex<Registry>>,
111 self_ref: Option<Arc<Mutex<dyn RouteController>>>,
114 global_error_handler: Option<ErrorHandlerConfig>,
116}
117
118impl DefaultRouteController {
119 pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
121 Self {
122 routes: HashMap::new(),
123 registry,
124 self_ref: None,
125 global_error_handler: None,
126 }
127 }
128
129 pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
133 self.self_ref = Some(self_ref);
134 }
135
136 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
138 self.global_error_handler = Some(config);
139 }
140
141 fn resolve_error_handler(
143 &self,
144 config: ErrorHandlerConfig,
145 producer_ctx: &ProducerContext,
146 registry: &Registry,
147 ) -> Result<ErrorHandlerLayer, CamelError> {
148 let dlc_producer = if let Some(ref uri) = config.dlc_uri {
150 let parsed = parse_uri(uri)?;
151 let component = registry.get_or_err(&parsed.scheme)?;
152 let endpoint = component.create_endpoint(uri)?;
153 Some(endpoint.create_producer(producer_ctx)?)
154 } else {
155 None
156 };
157
158 let mut resolved_policies = Vec::new();
160 for policy in config.policies {
161 let handler_producer = if let Some(ref uri) = policy.handled_by {
162 let parsed = parse_uri(uri)?;
163 let component = registry.get_or_err(&parsed.scheme)?;
164 let endpoint = component.create_endpoint(uri)?;
165 Some(endpoint.create_producer(producer_ctx)?)
166 } else {
167 None
168 };
169 resolved_policies.push((policy, handler_producer));
170 }
171
172 Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
173 }
174
175 fn resolve_steps(
177 &self,
178 steps: Vec<BuilderStep>,
179 producer_ctx: &ProducerContext,
180 registry: &Registry,
181 ) -> Result<Vec<BoxProcessor>, CamelError> {
182 let mut processors: Vec<BoxProcessor> = Vec::new();
183 for step in steps {
184 match step {
185 BuilderStep::Processor(svc) => {
186 processors.push(svc);
187 }
188 BuilderStep::To(uri) => {
189 let parsed = parse_uri(&uri)?;
190 let component = registry.get_or_err(&parsed.scheme)?;
191 let endpoint = component.create_endpoint(&uri)?;
192 let producer = endpoint.create_producer(producer_ctx)?;
193 processors.push(producer);
194 }
195 BuilderStep::Split { config, steps } => {
196 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
197 let sub_pipeline = compose_pipeline(sub_processors);
198 let splitter =
199 camel_processor::splitter::SplitterService::new(config, sub_pipeline);
200 processors.push(BoxProcessor::new(splitter));
201 }
202 BuilderStep::Aggregate { config } => {
203 let svc = camel_processor::AggregatorService::new(config);
204 processors.push(BoxProcessor::new(svc));
205 }
206 BuilderStep::Filter { predicate, steps } => {
207 let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
208 let sub_pipeline = compose_pipeline(sub_processors);
209 let svc =
210 camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
211 processors.push(BoxProcessor::new(svc));
212 }
213 BuilderStep::Choice { whens, otherwise } => {
214 let mut when_clauses = Vec::new();
216 for when_step in whens {
217 let sub_processors =
218 self.resolve_steps(when_step.steps, producer_ctx, registry)?;
219 let pipeline = compose_pipeline(sub_processors);
220 when_clauses.push(WhenClause {
221 predicate: when_step.predicate,
222 pipeline,
223 });
224 }
225 let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
227 let sub_processors =
228 self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
229 Some(compose_pipeline(sub_processors))
230 } else {
231 None
232 };
233 let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
234 processors.push(BoxProcessor::new(svc));
235 }
236 BuilderStep::WireTap { uri } => {
237 let parsed = parse_uri(&uri)?;
238 let component = registry.get_or_err(&parsed.scheme)?;
239 let endpoint = component.create_endpoint(&uri)?;
240 let producer = endpoint.create_producer(producer_ctx)?;
241 let svc = camel_processor::WireTapService::new(producer);
242 processors.push(BoxProcessor::new(svc));
243 }
244 BuilderStep::Multicast { config, steps } => {
245 let mut endpoints = Vec::new();
247 for step in steps {
248 let sub_processors =
249 self.resolve_steps(vec![step], producer_ctx, registry)?;
250 let endpoint = compose_pipeline(sub_processors);
251 endpoints.push(endpoint);
252 }
253 let svc = camel_processor::MulticastService::new(endpoints, config);
254 processors.push(BoxProcessor::new(svc));
255 }
256 }
257 }
258 Ok(processors)
259 }
260
261 pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
271 let route_id = definition.route_id().to_string();
272
273 if self.routes.contains_key(&route_id) {
274 return Err(CamelError::RouteError(format!(
275 "Route '{}' already exists",
276 route_id
277 )));
278 }
279
280 info!(route_id = %route_id, "Adding route to controller");
281
282 let definition_info = definition.to_info();
284 let from_uri = definition.from_uri.to_string();
285 let concurrency = definition.concurrency;
286
287 let producer_ctx = self
289 .self_ref
290 .clone()
291 .map(ProducerContext::new)
292 .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
293
294 let registry = self
296 .registry
297 .lock()
298 .expect("mutex poisoned: another thread panicked while holding this lock");
299
300 let processors = self.resolve_steps(definition.steps, &producer_ctx, ®istry)?;
302 let mut pipeline = compose_pipeline(processors);
303
304 if let Some(cb_config) = definition.circuit_breaker {
306 let cb_layer = CircuitBreakerLayer::new(cb_config);
307 pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
308 }
309
310 let eh_config = definition
312 .error_handler
313 .or_else(|| self.global_error_handler.clone());
314
315 if let Some(config) = eh_config {
316 let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
317 pipeline = BoxProcessor::new(layer.layer(pipeline));
318 }
319
320 drop(registry);
322
323 self.routes.insert(
324 route_id.clone(),
325 ManagedRoute {
326 definition: definition_info,
327 from_uri,
328 pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
329 concurrency,
330 status: RouteStatus::Stopped,
331 consumer_handle: None,
332 pipeline_handle: None,
333 cancel_token: CancellationToken::new(),
334 },
335 );
336
337 Ok(())
338 }
339
340 pub fn route_count(&self) -> usize {
342 self.routes.len()
343 }
344
345 pub fn route_ids(&self) -> Vec<String> {
347 self.routes.keys().cloned().collect()
348 }
349
350 pub fn swap_pipeline(
355 &self,
356 route_id: &str,
357 new_pipeline: BoxProcessor,
358 ) -> Result<(), CamelError> {
359 let managed = self
360 .routes
361 .get(route_id)
362 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
363
364 managed
365 .pipeline
366 .store(Arc::new(SyncBoxProcessor(new_pipeline)));
367 info!(route_id = %route_id, "Pipeline swapped atomically");
368 Ok(())
369 }
370
371 pub fn route_from_uri(&self, route_id: &str) -> Option<&str> {
373 self.routes.get(route_id).map(|r| r.from_uri.as_str())
374 }
375
376 pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
381 self.routes
382 .get(route_id)
383 .map(|r| r.pipeline.load().0.clone())
384 }
385
386 async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
388 let managed = self
389 .routes
390 .get_mut(route_id)
391 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
392
393 let current_status = managed.status.clone();
394 if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
395 return Ok(()); }
397
398 info!(route_id = %route_id, "Stopping route");
399 managed.status = RouteStatus::Stopping;
400
401 managed.cancel_token.cancel();
403
404 let consumer_handle = managed.consumer_handle.take();
406 let pipeline_handle = managed.pipeline_handle.take();
407
408 let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
413 match (consumer_handle, pipeline_handle) {
414 (Some(c), Some(p)) => {
415 let _ = tokio::join!(c, p);
416 }
417 (Some(c), None) => {
418 let _ = c.await;
419 }
420 (None, Some(p)) => {
421 let _ = p.await;
422 }
423 (None, None) => {}
424 }
425 })
426 .await;
427
428 if timeout_result.is_err() {
429 warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
430 }
431
432 let managed = self
434 .routes
435 .get_mut(route_id)
436 .expect("invariant: route must exist after prior existence check");
437
438 managed.cancel_token = CancellationToken::new();
440 managed.status = RouteStatus::Stopped;
441
442 info!(route_id = %route_id, "Route stopped");
443 Ok(())
444 }
445}
446
447#[async_trait::async_trait]
448impl RouteController for DefaultRouteController {
449 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
450 {
452 let managed = self
453 .routes
454 .get_mut(route_id)
455 .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
456
457 match managed.status {
458 RouteStatus::Started => return Ok(()), RouteStatus::Starting => {
460 return Err(CamelError::RouteError(format!(
461 "Route '{}' is already starting",
462 route_id
463 )));
464 }
465 RouteStatus::Stopped | RouteStatus::Failed(_) => {} RouteStatus::Stopping => {
467 return Err(CamelError::RouteError(format!(
468 "Route '{}' is stopping",
469 route_id
470 )));
471 }
472 RouteStatus::Suspended => {} }
474 managed.status = RouteStatus::Starting;
475 }
476
477 info!(route_id = %route_id, "Starting route");
478
479 let (from_uri, pipeline, concurrency) = {
481 let managed = self
482 .routes
483 .get(route_id)
484 .expect("invariant: route must exist after prior existence check");
485 (
486 managed.from_uri.clone(),
487 Arc::clone(&managed.pipeline),
488 managed.concurrency.clone(),
489 )
490 };
491
492 let parsed = parse_uri(&from_uri)?;
494 let registry = self
495 .registry
496 .lock()
497 .expect("mutex poisoned: another thread panicked while holding this lock");
498 let component = registry.get_or_err(&parsed.scheme)?;
499 let endpoint = component.create_endpoint(&from_uri)?;
500 let mut consumer = endpoint.create_consumer()?;
501 let consumer_concurrency = consumer.concurrency_model();
502 drop(registry);
504
505 let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
507
508 let managed = self
510 .routes
511 .get_mut(route_id)
512 .expect("invariant: route must exist after prior existence check");
513
514 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
516 let child_token = managed.cancel_token.child_token();
517 let consumer_ctx = ConsumerContext::new(tx, child_token.clone());
518
519 let route_id_for_consumer = route_id.to_string();
522 let consumer_handle = tokio::spawn(async move {
523 if let Err(e) = consumer.start(consumer_ctx).await {
524 error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
525 }
526 });
527
528 let pipeline_cancel = child_token;
530 let pipeline_handle = match effective_concurrency {
531 ConcurrencyModel::Sequential => {
532 tokio::spawn(async move {
533 while let Some(envelope) = rx.recv().await {
534 let ExchangeEnvelope { exchange, reply_tx } = envelope;
535
536 let mut pipeline = pipeline.load().0.clone();
538
539 if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
540 if let Some(tx) = reply_tx {
541 let _ = tx.send(Err(e));
542 }
543 return;
544 }
545
546 let result = pipeline.call(exchange).await;
547 if let Some(tx) = reply_tx {
548 let _ = tx.send(result);
549 } else if let Err(ref e) = result
550 && !matches!(e, CamelError::Stopped)
551 {
552 error!("Pipeline error: {e}");
553 }
554 }
555 })
556 }
557 ConcurrencyModel::Concurrent { max } => {
558 let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
559 tokio::spawn(async move {
560 while let Some(envelope) = rx.recv().await {
561 let ExchangeEnvelope { exchange, reply_tx } = envelope;
562 let pipe_ref = Arc::clone(&pipeline);
563 let sem = sem.clone();
564 let cancel = pipeline_cancel.clone();
565 tokio::spawn(async move {
566 let _permit = match &sem {
568 Some(s) => Some(s.acquire().await.expect("semaphore closed")),
569 None => None,
570 };
571
572 let mut pipe = pipe_ref.load().0.clone();
574
575 if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
577 if let Some(tx) = reply_tx {
578 let _ = tx.send(Err(e));
579 }
580 return;
581 }
582
583 let result = pipe.call(exchange).await;
584 if let Some(tx) = reply_tx {
585 let _ = tx.send(result);
586 } else if let Err(ref e) = result
587 && !matches!(e, CamelError::Stopped)
588 {
589 error!("Pipeline error: {e}");
590 }
591 });
592 }
593 })
594 }
595 };
596
597 let managed = self
599 .routes
600 .get_mut(route_id)
601 .expect("invariant: route must exist after prior existence check");
602 managed.consumer_handle = Some(consumer_handle);
603 managed.pipeline_handle = Some(pipeline_handle);
604 managed.status = RouteStatus::Started;
605
606 info!(route_id = %route_id, "Route started");
607 Ok(())
608 }
609
610 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
611 self.stop_route_internal(route_id).await
612 }
613
614 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
615 self.stop_route(route_id).await?;
616 tokio::time::sleep(Duration::from_millis(100)).await;
617 self.start_route(route_id).await
618 }
619
620 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
621 self.stop_route_internal(route_id).await?;
622 let managed = self
623 .routes
624 .get_mut(route_id)
625 .expect("invariant: route must exist after prior existence check");
626 managed.status = RouteStatus::Suspended;
627 info!(route_id = %route_id, "Route suspended");
628 Ok(())
629 }
630
631 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
632 let is_suspended = self
634 .routes
635 .get(route_id)
636 .map(|r| r.status == RouteStatus::Suspended)
637 .unwrap_or(false);
638
639 if !is_suspended {
640 return Err(CamelError::RouteError(format!(
641 "Route '{}' is not suspended",
642 route_id
643 )));
644 }
645
646 self.start_route(route_id).await
647 }
648
649 fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
650 self.routes.get(route_id).map(|r| r.status.clone())
651 }
652
653 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
654 let route_ids: Vec<String> = {
657 let mut pairs: Vec<_> = self
658 .routes
659 .iter()
660 .filter(|(_, r)| r.definition.auto_startup())
661 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
662 .collect();
663 pairs.sort_by_key(|(_, order)| *order);
664 pairs.into_iter().map(|(id, _)| id).collect()
665 };
666
667 info!("Starting {} auto-startup routes", route_ids.len());
668
669 let mut errors: Vec<String> = Vec::new();
671 for route_id in route_ids {
672 if let Err(e) = self.start_route(&route_id).await {
673 errors.push(format!("Route '{}': {}", route_id, e));
674 }
675 }
676
677 if !errors.is_empty() {
678 return Err(CamelError::RouteError(format!(
679 "Failed to start routes: {}",
680 errors.join(", ")
681 )));
682 }
683
684 info!("All auto-startup routes started");
685 Ok(())
686 }
687
688 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
689 let route_ids: Vec<String> = {
691 let mut pairs: Vec<_> = self
692 .routes
693 .iter()
694 .map(|(id, r)| (id.clone(), r.definition.startup_order()))
695 .collect();
696 pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
697 pairs.into_iter().map(|(id, _)| id).collect()
698 };
699
700 info!("Stopping {} routes", route_ids.len());
701
702 for route_id in route_ids {
703 let _ = self.stop_route(&route_id).await;
704 }
705
706 info!("All routes stopped");
707 Ok(())
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use std::sync::Arc;
715
716 #[tokio::test]
717 async fn test_swap_pipeline_updates_stored_pipeline() {
718 use camel_api::IdentityProcessor;
719
720 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
721 let mut controller = DefaultRouteController::new(registry);
722
723 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
724 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
725 ));
726 controller.set_self_ref(controller_arc);
727
728 let definition =
729 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("swap-test");
730 controller.add_route(definition).unwrap();
731
732 let new_pipeline = BoxProcessor::new(IdentityProcessor);
734 let result = controller.swap_pipeline("swap-test", new_pipeline);
735 assert!(result.is_ok());
736
737 let new_pipeline = BoxProcessor::new(IdentityProcessor);
739 let result = controller.swap_pipeline("nonexistent", new_pipeline);
740 assert!(result.is_err());
741 }
742
743 #[tokio::test]
744 async fn test_add_route_duplicate_id_fails() {
745 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
746 let mut controller = DefaultRouteController::new(registry);
747
748 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
750 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
751 ));
752 controller.set_self_ref(controller_arc);
753
754 let definition = crate::route::RouteDefinition::new("timer:tick", vec![])
755 .with_route_id("duplicate-route");
756 assert!(controller.add_route(definition).is_ok());
757
758 let definition2 = crate::route::RouteDefinition::new("timer:tock", vec![])
760 .with_route_id("duplicate-route");
761 let result = controller.add_route(definition2);
762 assert!(result.is_err());
763 let err = result.unwrap_err().to_string();
764 assert!(
765 err.contains("already exists"),
766 "error should mention 'already exists', got: {}",
767 err
768 );
769 }
770
771 #[tokio::test]
772 async fn test_add_route_with_id_succeeds() {
773 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
774 let mut controller = DefaultRouteController::new(registry);
775
776 let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
778 DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
779 ));
780 controller.set_self_ref(controller_arc);
781
782 let definition =
783 crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
784 assert!(controller.add_route(definition).is_ok());
785 assert_eq!(controller.route_count(), 1);
786 }
787}