Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tower::{Layer, Service, ServiceExt};
14use tracing::{error, info, warn};
15
16use camel_api::UnitOfWorkConfig;
17use camel_api::aggregator::AggregatorConfig;
18use camel_api::error_handler::ErrorHandlerConfig;
19use camel_api::metrics::MetricsCollector;
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, IdentityProcessor, NoOpMetrics, ProducerContext,
22    RouteController, RuntimeCommand, RuntimeHandle,
23};
24use camel_component_api::{
25    ComponentContext, ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope,
26};
27use camel_endpoint::parse_uri;
28pub use camel_processor::aggregator::SharedLanguageRegistry;
29use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
30use camel_processor::circuit_breaker::CircuitBreakerLayer;
31use camel_processor::error_handler::ErrorHandlerLayer;
32
33use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
34use crate::lifecycle::adapters::route_compiler::{
35    compose_pipeline, compose_traced_pipeline_with_contracts,
36};
37use crate::lifecycle::application::route_definition::{
38    BuilderStep, RouteDefinition, RouteDefinitionInfo,
39};
40use crate::shared::components::domain::Registry;
41use crate::shared::observability::domain::{DetailLevel, TracerConfig};
42use arc_swap::ArcSwap;
43use camel_bean::BeanRegistry;
44
45/// Notification sent when a route crashes.
46#[derive(Debug, Clone)]
47pub struct CrashNotification {
48    /// The ID of the crashed route.
49    pub route_id: String,
50    /// The error that caused the crash.
51    pub error: String,
52}
53
54/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
55///
56/// # Safety
57///
58/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
59/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
60///
61/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
62///    (creates a new boxed service from the inner clone).
63/// 2. The clone is owned by the calling thread and never shared.
64/// 3. ArcSwap guarantees we only get & references (no &mut).
65///
66/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
67/// clone() does not mutate shared state and each thread gets an independent copy.
68pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
69unsafe impl Sync for SyncBoxProcessor {}
70
71type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
72
73/// Internal state for a managed route.
74pub(super) struct AggregateSplitInfo {
75    pub(super) pre_pipeline: SharedPipeline,
76    pub(super) agg_config: AggregatorConfig,
77    pub(super) post_pipeline: SharedPipeline,
78}
79
80pub(super) struct ManagedRoute {
81    /// The route definition metadata (for introspection).
82    pub(super) definition: RouteDefinitionInfo,
83    /// Source endpoint URI.
84    pub(super) from_uri: String,
85    /// Resolved processor pipeline (wrapped for atomic swap).
86    pub(super) pipeline: SharedPipeline,
87    /// Concurrency model override (if any).
88    pub(super) concurrency: Option<ConcurrencyModel>,
89    /// Handle for the consumer task (if running).
90    pub(super) consumer_handle: Option<JoinHandle<()>>,
91    /// Handle for the pipeline task (if running).
92    pub(super) pipeline_handle: Option<JoinHandle<()>>,
93    /// Cancellation token for stopping the consumer task.
94    /// This allows independent control of the consumer lifecycle (for suspend/resume).
95    pub(super) consumer_cancel_token: CancellationToken,
96    /// Cancellation token for stopping the pipeline task.
97    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
98    pub(super) pipeline_cancel_token: CancellationToken,
99    /// Channel sender for sending exchanges to the pipeline.
100    /// Stored to allow resuming a suspended route without recreating the channel.
101    pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
102    /// In-flight exchange counter. `None` when UoW is not configured for this route.
103    pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
104    pub(super) aggregate_split: Option<AggregateSplitInfo>,
105    pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
106}
107
108pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
109    handle.as_ref().is_some_and(|h| !h.is_finished())
110}
111
112fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
113    match (
114        handle_is_running(&managed.consumer_handle),
115        handle_is_running(&managed.pipeline_handle),
116    ) {
117        (true, true) => "Started",
118        (false, true) => "Suspended",
119        (true, false) => "Stopping",
120        (false, false) => "Stopped",
121    }
122}
123
124fn find_top_level_aggregate_with_timeout(
125    steps: &[BuilderStep],
126) -> Option<(usize, AggregatorConfig)> {
127    for (i, step) in steps.iter().enumerate() {
128        if let BuilderStep::Aggregate { config } = step {
129            if has_timeout_condition(&config.completion) {
130                return Some((i, config.clone()));
131            }
132            break;
133        }
134    }
135    None
136}
137
138pub(crate) struct ControllerComponentContext {
139    registry: Arc<std::sync::Mutex<Registry>>,
140    languages: SharedLanguageRegistry,
141    metrics: Arc<dyn MetricsCollector>,
142}
143
144impl ControllerComponentContext {
145    pub(crate) fn new(
146        registry: Arc<std::sync::Mutex<Registry>>,
147        languages: SharedLanguageRegistry,
148        metrics: Arc<dyn MetricsCollector>,
149    ) -> Self {
150        Self {
151            registry,
152            languages,
153            metrics,
154        }
155    }
156}
157
158impl ComponentContext for ControllerComponentContext {
159    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn camel_component_api::Component>> {
160        self.registry.lock().ok()?.get(scheme)
161    }
162
163    fn resolve_language(&self, name: &str) -> Option<Arc<dyn camel_language_api::Language>> {
164        self.languages.lock().ok()?.get(name).cloned()
165    }
166
167    fn metrics(&self) -> Arc<dyn MetricsCollector> {
168        Arc::clone(&self.metrics)
169    }
170}
171
172fn is_pending(ex: &Exchange) -> bool {
173    ex.property("CamelAggregatorPending")
174        .and_then(|v| v.as_bool())
175        .unwrap_or(false)
176}
177
178/// Wait for a pipeline service to be ready with circuit breaker backoff.
179///
180/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
181/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
182/// cancellation checks. It returns `Ok(())` when the service is ready, or
183/// `Err(e)` if cancellation occurred or a fatal error was encountered.
184async fn ready_with_backoff(
185    pipeline: &mut BoxProcessor,
186    cancel: &CancellationToken,
187) -> Result<(), CamelError> {
188    loop {
189        match pipeline.ready().await {
190            Ok(_) => return Ok(()),
191            Err(CamelError::CircuitOpen(ref msg)) => {
192                warn!("Circuit open, backing off: {msg}");
193                tokio::select! {
194                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
195                        continue;
196                    }
197                    _ = cancel.cancelled() => {
198                        // Shutting down — don't retry.
199                        return Err(CamelError::CircuitOpen(msg.clone()));
200                    }
201                }
202            }
203            Err(e) => {
204                error!("Pipeline not ready: {e}");
205                return Err(e);
206            }
207        }
208    }
209}
210
211fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
212    let stamp = std::time::SystemTime::now()
213        .duration_since(std::time::UNIX_EPOCH)
214        .unwrap_or_default()
215        .as_nanos();
216    RuntimeCommand::FailRoute {
217        route_id: route_id.to_string(),
218        error: error.to_string(),
219        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
220        causation_id: None,
221    }
222}
223
224pub(super) async fn publish_runtime_failure(
225    runtime: Option<Weak<dyn RuntimeHandle>>,
226    route_id: &str,
227    error: &str,
228) {
229    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
230        return;
231    };
232    let command = runtime_failure_command(route_id, error);
233    if let Err(runtime_error) = runtime.execute(command).await {
234        warn!(
235            route_id = %route_id,
236            error = %runtime_error,
237            "failed to synchronize route crash with runtime projection"
238        );
239    }
240}
241
242/// Default implementation of [`RouteController`].
243///
244/// Manages route lifecycle with support for:
245/// - Starting/stopping individual routes
246/// - Suspending and resuming routes
247/// - Auto-startup with startup ordering
248/// - Graceful shutdown
249pub struct DefaultRouteController {
250    /// Routes indexed by route ID.
251    routes: HashMap<String, ManagedRoute>,
252    /// Reference to the component registry for resolving endpoints.
253    registry: Arc<std::sync::Mutex<Registry>>,
254    /// Shared language registry for resolving declarative language expressions.
255    languages: SharedLanguageRegistry,
256    /// Bean registry for bean method invocation.
257    beans: Arc<std::sync::Mutex<BeanRegistry>>,
258    /// Runtime handle injected into ProducerContext for command/query operations.
259    runtime: Option<Weak<dyn RuntimeHandle>>,
260    /// Optional global error handler applied to all routes without a per-route handler.
261    global_error_handler: Option<ErrorHandlerConfig>,
262    /// Optional crash notifier for supervision.
263    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
264    /// Whether tracing is enabled for route pipelines.
265    tracing_enabled: bool,
266    /// Detail level for tracing when enabled.
267    tracer_detail_level: DetailLevel,
268    /// Metrics collector for tracing processor.
269    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
270}
271
272impl DefaultRouteController {
273    /// Create a new `DefaultRouteController` with the given registry.
274    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
275        Self::with_beans(
276            registry,
277            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
278        )
279    }
280
281    /// Create a new `DefaultRouteController` with shared bean registry.
282    pub fn with_beans(
283        registry: Arc<std::sync::Mutex<Registry>>,
284        beans: Arc<std::sync::Mutex<BeanRegistry>>,
285    ) -> Self {
286        Self {
287            routes: HashMap::new(),
288            registry,
289            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
290            beans,
291            runtime: None,
292            global_error_handler: None,
293            crash_notifier: None,
294            tracing_enabled: false,
295            tracer_detail_level: DetailLevel::Minimal,
296            tracer_metrics: None,
297        }
298    }
299
300    /// Create a new `DefaultRouteController` with shared language registry.
301    pub fn with_languages(
302        registry: Arc<std::sync::Mutex<Registry>>,
303        languages: SharedLanguageRegistry,
304    ) -> Self {
305        Self {
306            routes: HashMap::new(),
307            registry,
308            languages,
309            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
310            runtime: None,
311            global_error_handler: None,
312            crash_notifier: None,
313            tracing_enabled: false,
314            tracer_detail_level: DetailLevel::Minimal,
315            tracer_metrics: None,
316        }
317    }
318
319    /// Set runtime handle for ProducerContext creation.
320    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
321        self.runtime = Some(Arc::downgrade(&runtime));
322    }
323
324    /// Set the crash notifier for supervision.
325    ///
326    /// When set, the controller will send a [`CrashNotification`] whenever
327    /// a consumer crashes.
328    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
329        self.crash_notifier = Some(tx);
330    }
331
332    /// Set a global error handler applied to all routes without a per-route handler.
333    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
334        self.global_error_handler = Some(config);
335    }
336
337    /// Configure tracing for this route controller.
338    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
339        self.tracing_enabled = config.enabled;
340        self.tracer_detail_level = config.detail_level.clone();
341        self.tracer_metrics = config.metrics_collector.clone();
342    }
343
344    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
345        let mut producer_ctx = ProducerContext::new();
346        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
347            producer_ctx = producer_ctx.with_runtime(runtime);
348        }
349        Ok(producer_ctx)
350    }
351
352    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
353    fn resolve_error_handler(
354        &self,
355        config: ErrorHandlerConfig,
356        producer_ctx: &ProducerContext,
357        component_ctx: &dyn ComponentContext,
358    ) -> Result<ErrorHandlerLayer, CamelError> {
359        // Resolve DLC URI → producer.
360        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
361            let parsed = parse_uri(uri)?;
362            let component = component_ctx
363                .resolve_component(&parsed.scheme)
364                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
365            let endpoint = component.create_endpoint(uri, component_ctx)?;
366            Some(endpoint.create_producer(producer_ctx)?)
367        } else {
368            None
369        };
370
371        // Resolve per-policy `handled_by` URIs.
372        let mut resolved_policies = Vec::new();
373        for policy in config.policies {
374            let handler_producer = if let Some(ref uri) = policy.handled_by {
375                let parsed = parse_uri(uri)?;
376                let component = component_ctx
377                    .resolve_component(&parsed.scheme)
378                    .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
379                let endpoint = component.create_endpoint(uri, component_ctx)?;
380                Some(endpoint.create_producer(producer_ctx)?)
381            } else {
382                None
383            };
384            resolved_policies.push((policy, handler_producer));
385        }
386
387        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
388    }
389
390    /// Resolve a `UnitOfWorkConfig` into an `(ExchangeUoWLayer, Arc<AtomicU64>)`.
391    /// Returns `Err` if any hook URI cannot be resolved.
392    fn resolve_uow_layer(
393        &self,
394        config: &UnitOfWorkConfig,
395        producer_ctx: &ProducerContext,
396        component_ctx: &dyn ComponentContext,
397        counter: Option<Arc<AtomicU64>>,
398    ) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
399        let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
400            let parsed = parse_uri(uri)?;
401            let component = component_ctx
402                .resolve_component(&parsed.scheme)
403                .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
404            let endpoint = component.create_endpoint(uri, component_ctx)?;
405            endpoint.create_producer(producer_ctx).map_err(|e| {
406                CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
407            })
408        };
409
410        let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
411        let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
412
413        let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
414        let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
415        Ok((layer, counter))
416    }
417
418    /// Resolve BuilderSteps into BoxProcessors.
419    pub(crate) fn resolve_steps(
420        &self,
421        steps: Vec<BuilderStep>,
422        producer_ctx: &ProducerContext,
423        registry: &Arc<std::sync::Mutex<Registry>>,
424    ) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
425        let component_ctx = Arc::new(ControllerComponentContext::new(
426            Arc::clone(registry),
427            Arc::clone(&self.languages),
428            self.tracer_metrics
429                .clone()
430                .unwrap_or_else(|| Arc::new(NoOpMetrics)),
431        ));
432
433        super::step_resolution::resolve_steps(
434            steps,
435            producer_ctx,
436            registry,
437            &self.languages,
438            &self.beans,
439            component_ctx,
440        )
441    }
442
443    /// Add a route definition to the controller.
444    ///
445    /// Steps are resolved immediately using the registry.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if:
450    /// - A route with the same ID already exists
451    /// - Step resolution fails
452    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
453        let route_id = definition.route_id().to_string();
454
455        if self.routes.contains_key(&route_id) {
456            return Err(CamelError::RouteError(format!(
457                "Route '{}' already exists",
458                route_id
459            )));
460        }
461
462        info!(route_id = %route_id, "Adding route to controller");
463
464        // Extract definition info for storage before steps are consumed
465        let definition_info = definition.to_info();
466        let RouteDefinition {
467            from_uri,
468            steps,
469            error_handler,
470            circuit_breaker,
471            unit_of_work,
472            concurrency,
473            ..
474        } = definition;
475
476        // Create ProducerContext from self_ref for step resolution
477        let producer_ctx = self.build_producer_context()?;
478
479        // Take ownership of steps before resolve_steps consumes them
480        let mut aggregate_split: Option<AggregateSplitInfo> = None;
481        let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
482            Some((idx, agg_config)) => {
483                let mut pre_steps = steps;
484                let mut rest = pre_steps.split_off(idx);
485                let _agg_step = rest.remove(0);
486                let post_steps = rest;
487
488                let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
489                let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
490                let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
491                    compose_pipeline(pre_procs),
492                )));
493
494                let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
495                let post_procs: Vec<BoxProcessor> =
496                    post_pairs.into_iter().map(|(p, _)| p).collect();
497                let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
498                    compose_pipeline(post_procs),
499                )));
500
501                aggregate_split = Some(AggregateSplitInfo {
502                    pre_pipeline,
503                    agg_config,
504                    post_pipeline,
505                });
506
507                vec![]
508            }
509            None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
510        };
511        let route_id_for_tracing = route_id.clone();
512        let mut pipeline = if processors_with_contracts.is_empty() {
513            BoxProcessor::new(IdentityProcessor)
514        } else {
515            compose_traced_pipeline_with_contracts(
516                processors_with_contracts,
517                &route_id_for_tracing,
518                self.tracing_enabled,
519                self.tracer_detail_level.clone(),
520                self.tracer_metrics.clone(),
521            )
522        };
523
524        // Apply circuit breaker if configured
525        if let Some(cb_config) = circuit_breaker {
526            let cb_layer = CircuitBreakerLayer::new(cb_config);
527            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
528        }
529
530        // Determine which error handler config to use (per-route takes precedence)
531        let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
532
533        if let Some(config) = eh_config {
534            let component_ctx = ControllerComponentContext::new(
535                Arc::clone(&self.registry),
536                Arc::clone(&self.languages),
537                self.tracer_metrics
538                    .clone()
539                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
540            );
541            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
542            pipeline = BoxProcessor::new(layer.layer(pipeline));
543        }
544
545        // Apply UoW layer outermost (after error handler)
546        let uow_counter = if let Some(uow_config) = &unit_of_work {
547            let component_ctx = ControllerComponentContext::new(
548                Arc::clone(&self.registry),
549                Arc::clone(&self.languages),
550                self.tracer_metrics
551                    .clone()
552                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
553            );
554            let (uow_layer, counter) =
555                self.resolve_uow_layer(uow_config, &producer_ctx, &component_ctx, None)?;
556            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
557            Some(counter)
558        } else {
559            None
560        };
561
562        self.routes.insert(
563            route_id.clone(),
564            ManagedRoute {
565                definition: definition_info,
566                from_uri,
567                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
568                concurrency,
569                consumer_handle: None,
570                pipeline_handle: None,
571                consumer_cancel_token: CancellationToken::new(),
572                pipeline_cancel_token: CancellationToken::new(),
573                channel_sender: None,
574                in_flight: uow_counter,
575                aggregate_split,
576                agg_service: None,
577            },
578        );
579
580        Ok(())
581    }
582
583    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
584    ///
585    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
586    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
587    pub fn compile_route_definition(
588        &self,
589        def: RouteDefinition,
590    ) -> Result<BoxProcessor, CamelError> {
591        let route_id = def.route_id().to_string();
592
593        let producer_ctx = self.build_producer_context()?;
594
595        let processors_with_contracts =
596            self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
597        let mut pipeline = compose_traced_pipeline_with_contracts(
598            processors_with_contracts,
599            &route_id,
600            self.tracing_enabled,
601            self.tracer_detail_level.clone(),
602            self.tracer_metrics.clone(),
603        );
604
605        if let Some(cb_config) = def.circuit_breaker {
606            let cb_layer = CircuitBreakerLayer::new(cb_config);
607            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
608        }
609
610        let eh_config = def
611            .error_handler
612            .clone()
613            .or_else(|| self.global_error_handler.clone());
614        if let Some(config) = eh_config {
615            let component_ctx = ControllerComponentContext::new(
616                Arc::clone(&self.registry),
617                Arc::clone(&self.languages),
618                self.tracer_metrics
619                    .clone()
620                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
621            );
622            let layer = self.resolve_error_handler(config, &producer_ctx, &component_ctx)?;
623            pipeline = BoxProcessor::new(layer.layer(pipeline));
624        }
625
626        // Apply UoW layer outermost
627        if let Some(uow_config) = &def.unit_of_work {
628            let existing_counter = self
629                .routes
630                .get(&route_id)
631                .and_then(|r| r.in_flight.as_ref().map(Arc::clone));
632
633            let component_ctx = ControllerComponentContext::new(
634                Arc::clone(&self.registry),
635                Arc::clone(&self.languages),
636                self.tracer_metrics
637                    .clone()
638                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
639            );
640
641            let (uow_layer, _counter) = self.resolve_uow_layer(
642                uow_config,
643                &producer_ctx,
644                &component_ctx,
645                existing_counter,
646            )?;
647
648            pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
649        }
650
651        Ok(pipeline)
652    }
653
654    /// Remove a route from the controller map.
655    ///
656    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
657    /// Returns an error if the route is still running or does not exist.
658    /// Does not cancel any running tasks — call `stop_route` first.
659    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
660        let managed = self.routes.get(route_id).ok_or_else(|| {
661            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
662        })?;
663        if handle_is_running(&managed.consumer_handle)
664            || handle_is_running(&managed.pipeline_handle)
665        {
666            return Err(CamelError::RouteError(format!(
667                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
668                route_id,
669                inferred_lifecycle_label(managed)
670            )));
671        }
672        self.routes.remove(route_id);
673        info!(route_id = %route_id, "Route removed from controller");
674        Ok(())
675    }
676
677    /// Returns the number of routes in the controller.
678    pub fn route_count(&self) -> usize {
679        self.routes.len()
680    }
681
682    pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
683        self.routes.get(route_id).map(|r| {
684            r.in_flight
685                .as_ref()
686                .map_or(0, |c| c.load(Ordering::Relaxed))
687        })
688    }
689
690    /// Returns `true` if a route with the given ID exists.
691    pub fn route_exists(&self, route_id: &str) -> bool {
692        self.routes.contains_key(route_id)
693    }
694
695    /// Returns all route IDs.
696    pub fn route_ids(&self) -> Vec<String> {
697        self.routes.keys().cloned().collect()
698    }
699
700    pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
701        self.routes
702            .get(route_id)
703            .and_then(|m| m.definition.source_hash())
704    }
705
706    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
707    pub fn auto_startup_route_ids(&self) -> Vec<String> {
708        let mut pairs: Vec<(String, i32)> = self
709            .routes
710            .iter()
711            .filter(|(_, managed)| managed.definition.auto_startup())
712            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
713            .collect();
714        pairs.sort_by_key(|(_, order)| *order);
715        pairs.into_iter().map(|(id, _)| id).collect()
716    }
717
718    /// Returns route IDs sorted by shutdown order (startup order descending).
719    pub fn shutdown_route_ids(&self) -> Vec<String> {
720        let mut pairs: Vec<(String, i32)> = self
721            .routes
722            .iter()
723            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
724            .collect();
725        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
726        pairs.into_iter().map(|(id, _)| id).collect()
727    }
728
729    /// Atomically swap the pipeline of a route.
730    ///
731    /// In-flight requests finish with the old pipeline (kept alive by Arc).
732    /// New requests immediately use the new pipeline.
733    pub fn swap_pipeline(
734        &self,
735        route_id: &str,
736        new_pipeline: BoxProcessor,
737    ) -> Result<(), CamelError> {
738        let managed = self
739            .routes
740            .get(route_id)
741            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
742
743        if managed.aggregate_split.is_some() {
744            tracing::warn!(
745                route_id = %route_id,
746                "swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
747            );
748        }
749
750        managed
751            .pipeline
752            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
753        info!(route_id = %route_id, "Pipeline swapped atomically");
754        Ok(())
755    }
756
757    /// Returns the from_uri of a route, if it exists.
758    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
759        self.routes.get(route_id).map(|r| r.from_uri.clone())
760    }
761
762    /// Get a clone of the current pipeline for a route.
763    ///
764    /// This is useful for testing and introspection.
765    /// Returns `None` if the route doesn't exist.
766    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
767        self.routes
768            .get(route_id)
769            .map(|r| r.pipeline.load().0.clone())
770    }
771
772    /// Internal stop implementation that can set custom status.
773    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
774        super::consumer_management::stop_route_internal(&mut self.routes, route_id).await
775    }
776
777    pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
778        self.start_route(route_id).await
779    }
780
781    pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
782        self.stop_route(route_id).await
783    }
784}
785
786#[async_trait::async_trait]
787impl RouteController for DefaultRouteController {
788    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
789        // Check if route exists and can be started.
790        {
791            let managed = self
792                .routes
793                .get_mut(route_id)
794                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
795
796            let consumer_running = handle_is_running(&managed.consumer_handle);
797            let pipeline_running = handle_is_running(&managed.pipeline_handle);
798            if consumer_running && pipeline_running {
799                return Ok(());
800            }
801            if !consumer_running && pipeline_running {
802                return Err(CamelError::RouteError(format!(
803                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
804                    route_id
805                )));
806            }
807            if consumer_running && !pipeline_running {
808                return Err(CamelError::RouteError(format!(
809                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
810                    route_id
811                )));
812            }
813        }
814
815        info!(route_id = %route_id, "Starting route");
816
817        // Get the resolved route info
818        let (from_uri, pipeline, concurrency) = {
819            let managed = self
820                .routes
821                .get(route_id)
822                .expect("invariant: route must exist after prior existence check");
823            (
824                managed.from_uri.clone(),
825                Arc::clone(&managed.pipeline),
826                managed.concurrency.clone(),
827            )
828        };
829
830        // Clone crash notifier for consumer task
831        let crash_notifier = self.crash_notifier.clone();
832        let runtime_for_consumer = self.runtime.clone();
833
834        let (consumer, consumer_concurrency) = super::consumer_management::create_route_consumer(
835            &self.registry,
836            &from_uri,
837            &ControllerComponentContext::new(
838                Arc::clone(&self.registry),
839                Arc::clone(&self.languages),
840                self.tracer_metrics
841                    .clone()
842                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
843            ),
844        )?;
845
846        // Resolve effective concurrency: route override > consumer default
847        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
848
849        // Get the managed route for mutation
850        let managed = self
851            .routes
852            .get_mut(route_id)
853            .expect("invariant: route must exist after prior existence check");
854
855        // Create channel for consumer to send exchanges
856        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
857        // Create child tokens for independent lifecycle control
858        let consumer_cancel = managed.consumer_cancel_token.child_token();
859        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
860        // Clone sender for storage (to reuse on resume)
861        let tx_for_storage = tx.clone();
862        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
863
864        // --- Aggregator v2: check for aggregate route with timeout ---
865        let managed = self
866            .routes
867            .get_mut(route_id)
868            .expect("invariant: route must exist after prior existence check");
869
870        if let Some(split) = managed.aggregate_split.as_ref() {
871            let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
872
873            let route_cancel_clone = pipeline_cancel.clone();
874            let svc = AggregatorService::new(
875                split.agg_config.clone(),
876                late_tx,
877                Arc::clone(&self.languages),
878                route_cancel_clone,
879            );
880            let agg = Arc::new(std::sync::Mutex::new(svc));
881
882            managed.agg_service = Some(Arc::clone(&agg));
883
884            let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
885            let pre_pipeline = Arc::clone(&split.pre_pipeline);
886            let post_pipeline = Arc::clone(&split.post_pipeline);
887
888            // Spawn consumer task (same as normal route)
889            let consumer_handle = super::consumer_management::spawn_consumer_task(
890                route_id.to_string(),
891                consumer,
892                consumer_ctx,
893                crash_notifier,
894                runtime_for_consumer,
895                false,
896            );
897
898            // Spawn biased select forward loop
899            let pipeline_handle = tokio::spawn(async move {
900                loop {
901                    tokio::select! {
902                        biased;
903
904                        late_ex = async {
905                            let mut rx = late_rx.lock().await;
906                            rx.recv().await
907                        } => {
908                            match late_ex {
909                                Some(ex) => {
910                                    let pipe = post_pipeline.load();
911                                    if let Err(e) = pipe.0.clone().oneshot(ex).await {
912                                        tracing::warn!(error = %e, "late exchange post-pipeline failed");
913                                    }
914                                }
915                                None => return,
916                            }
917                        }
918
919                        envelope_opt = rx.recv() => {
920                            match envelope_opt {
921                                Some(envelope) => {
922                                    let ExchangeEnvelope { exchange, reply_tx } = envelope;
923                                    let pre_pipe = pre_pipeline.load();
924                                    let ex = match pre_pipe.0.clone().oneshot(exchange).await {
925                                        Ok(ex) => ex,
926                                        Err(e) => {
927                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
928                                            continue;
929                                        }
930                                    };
931
932                                    let ex = {
933                                        let cloned_svc = agg
934                                            .lock()
935                                            .expect("mutex poisoned: another thread panicked while holding this lock")
936                                            .clone();
937                                        cloned_svc.oneshot(ex).await
938                                    };
939
940                                    match ex {
941                                        Ok(ex) => {
942                                            if !is_pending(&ex) {
943                                                let post_pipe = post_pipeline.load();
944                                                let out = post_pipe.0.clone().oneshot(ex).await;
945                                                if let Some(tx) = reply_tx { let _ = tx.send(out); }
946                                            } else if let Some(tx) = reply_tx {
947                                                let _ = tx.send(Ok(ex));
948                                            }
949                                        }
950                                        Err(e) => {
951                                            if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
952                                        }
953                                    }
954                                }
955                                None => return,
956                            }
957                        }
958
959                        _ = pipeline_cancel.cancelled() => {
960                            {
961                                let guard = agg
962                                    .lock()
963                                    .expect("mutex poisoned: another thread panicked while holding this lock");
964                                guard.force_complete_all();
965                            }
966                            let mut rx_guard = late_rx.lock().await;
967                            while let Ok(late_ex) = rx_guard.try_recv() {
968                                let pipe = post_pipeline.load();
969                                let _ = pipe.0.clone().oneshot(late_ex).await;
970                            }
971                            break;
972                        }
973                    }
974                }
975            });
976
977            let managed = self
978                .routes
979                .get_mut(route_id)
980                .expect("invariant: route must exist");
981            managed.consumer_handle = Some(consumer_handle);
982            managed.pipeline_handle = Some(pipeline_handle);
983            managed.channel_sender = Some(tx_for_storage);
984
985            info!(route_id = %route_id, "Route started (aggregate with timeout)");
986            return Ok(());
987        }
988        // --- End aggregator v2 branch ---
989
990        // Start consumer in background task.
991        let consumer_handle = super::consumer_management::spawn_consumer_task(
992            route_id.to_string(),
993            consumer,
994            consumer_ctx,
995            crash_notifier,
996            runtime_for_consumer,
997            false,
998        );
999
1000        // Spawn pipeline task with its own cancellation token
1001        let pipeline_handle = match effective_concurrency {
1002            ConcurrencyModel::Sequential => {
1003                tokio::spawn(async move {
1004                    loop {
1005                        // Use select! to exit promptly on cancellation even when idle
1006                        let envelope = tokio::select! {
1007                            envelope = rx.recv() => match envelope {
1008                                Some(e) => e,
1009                                None => return, // Channel closed
1010                            },
1011                            _ = pipeline_cancel.cancelled() => {
1012                                // Cancellation requested - exit gracefully
1013                                return;
1014                            }
1015                        };
1016                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1017
1018                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1019                        let mut pipeline = pipeline.load().0.clone();
1020
1021                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1022                            if let Some(tx) = reply_tx {
1023                                let _ = tx.send(Err(e));
1024                            }
1025                            return;
1026                        }
1027
1028                        let result = pipeline.call(exchange).await;
1029                        if let Some(tx) = reply_tx {
1030                            let _ = tx.send(result);
1031                        } else if let Err(ref e) = result
1032                            && !matches!(e, CamelError::Stopped)
1033                        {
1034                            error!("Pipeline error: {e}");
1035                        }
1036                    }
1037                })
1038            }
1039            ConcurrencyModel::Concurrent { max } => {
1040                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1041                tokio::spawn(async move {
1042                    loop {
1043                        // Use select! to exit promptly on cancellation even when idle
1044                        let envelope = tokio::select! {
1045                            envelope = rx.recv() => match envelope {
1046                                Some(e) => e,
1047                                None => return, // Channel closed
1048                            },
1049                            _ = pipeline_cancel.cancelled() => {
1050                                // Cancellation requested - exit gracefully
1051                                return;
1052                            }
1053                        };
1054                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1055                        let pipe_ref = Arc::clone(&pipeline);
1056                        let sem = sem.clone();
1057                        let cancel = pipeline_cancel.clone();
1058                        tokio::spawn(async move {
1059                            // Acquire semaphore permit if bounded
1060                            let _permit = match &sem {
1061                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1062                                None => None,
1063                            };
1064
1065                            // Load current pipeline from ArcSwap
1066                            let mut pipe = pipe_ref.load().0.clone();
1067
1068                            // Wait for service ready with circuit breaker backoff
1069                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1070                                if let Some(tx) = reply_tx {
1071                                    let _ = tx.send(Err(e));
1072                                }
1073                                return;
1074                            }
1075
1076                            let result = pipe.call(exchange).await;
1077                            if let Some(tx) = reply_tx {
1078                                let _ = tx.send(result);
1079                            } else if let Err(ref e) = result
1080                                && !matches!(e, CamelError::Stopped)
1081                            {
1082                                error!("Pipeline error: {e}");
1083                            }
1084                        });
1085                    }
1086                })
1087            }
1088        };
1089
1090        // Store handles and update status
1091        let managed = self
1092            .routes
1093            .get_mut(route_id)
1094            .expect("invariant: route must exist after prior existence check");
1095        managed.consumer_handle = Some(consumer_handle);
1096        managed.pipeline_handle = Some(pipeline_handle);
1097        managed.channel_sender = Some(tx_for_storage);
1098
1099        info!(route_id = %route_id, "Route started");
1100        Ok(())
1101    }
1102
1103    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1104        self.stop_route_internal(route_id).await
1105    }
1106
1107    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1108        self.stop_route(route_id).await?;
1109        tokio::time::sleep(Duration::from_millis(100)).await;
1110        self.start_route(route_id).await
1111    }
1112
1113    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1114        // Check route exists and state.
1115        let managed = self
1116            .routes
1117            .get_mut(route_id)
1118            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1119
1120        let consumer_running = handle_is_running(&managed.consumer_handle);
1121        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1122
1123        // Can only suspend from active started state.
1124        if !consumer_running || !pipeline_running {
1125            return Err(CamelError::RouteError(format!(
1126                "Cannot suspend route '{}' with execution lifecycle {}",
1127                route_id,
1128                inferred_lifecycle_label(managed)
1129            )));
1130        }
1131
1132        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1133
1134        // Cancel consumer token only (keep pipeline running)
1135        let managed = self
1136            .routes
1137            .get_mut(route_id)
1138            .expect("invariant: route must exist after prior existence check");
1139        managed.consumer_cancel_token.cancel();
1140
1141        // Take and join consumer handle
1142        let managed = self
1143            .routes
1144            .get_mut(route_id)
1145            .expect("invariant: route must exist after prior existence check");
1146        let consumer_handle = managed.consumer_handle.take();
1147
1148        // Wait for consumer task to complete with timeout
1149        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1150            if let Some(handle) = consumer_handle {
1151                let _ = handle.await;
1152            }
1153        })
1154        .await;
1155
1156        if timeout_result.is_err() {
1157            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1158        }
1159
1160        // Get the managed route again (can't hold across await)
1161        let managed = self
1162            .routes
1163            .get_mut(route_id)
1164            .expect("invariant: route must exist after prior existence check");
1165
1166        // Create fresh cancellation token for consumer (for resume)
1167        managed.consumer_cancel_token = CancellationToken::new();
1168
1169        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1170        Ok(())
1171    }
1172
1173    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1174        // Check route exists and is Suspended-equivalent execution state.
1175        let managed = self
1176            .routes
1177            .get(route_id)
1178            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1179
1180        let consumer_running = handle_is_running(&managed.consumer_handle);
1181        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1182        if consumer_running || !pipeline_running {
1183            return Err(CamelError::RouteError(format!(
1184                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1185                route_id,
1186                inferred_lifecycle_label(managed)
1187            )));
1188        }
1189
1190        // Get the stored channel sender (must exist for a suspended route)
1191        let sender = managed.channel_sender.clone().ok_or_else(|| {
1192            CamelError::RouteError("Suspended route has no channel sender".into())
1193        })?;
1194
1195        // Get from_uri and concurrency for creating new consumer
1196        let from_uri = managed.from_uri.clone();
1197
1198        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1199
1200        let (consumer, _) = super::consumer_management::create_route_consumer(
1201            &self.registry,
1202            &from_uri,
1203            &ControllerComponentContext::new(
1204                Arc::clone(&self.registry),
1205                Arc::clone(&self.languages),
1206                self.tracer_metrics
1207                    .clone()
1208                    .unwrap_or_else(|| Arc::new(NoOpMetrics)),
1209            ),
1210        )?;
1211
1212        // Get the managed route for mutation
1213        let managed = self
1214            .routes
1215            .get_mut(route_id)
1216            .expect("invariant: route must exist after prior existence check");
1217
1218        // Create child token for consumer lifecycle
1219        let consumer_cancel = managed.consumer_cancel_token.child_token();
1220
1221        let crash_notifier = self.crash_notifier.clone();
1222        let runtime_for_consumer = self.runtime.clone();
1223
1224        // Create ConsumerContext with the stored sender
1225        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1226
1227        // Spawn consumer task
1228        let consumer_handle = super::consumer_management::spawn_consumer_task(
1229            route_id.to_string(),
1230            consumer,
1231            consumer_ctx,
1232            crash_notifier,
1233            runtime_for_consumer,
1234            true,
1235        );
1236
1237        // Store consumer handle and update status
1238        let managed = self
1239            .routes
1240            .get_mut(route_id)
1241            .expect("invariant: route must exist after prior existence check");
1242        managed.consumer_handle = Some(consumer_handle);
1243
1244        info!(route_id = %route_id, "Route resumed");
1245        Ok(())
1246    }
1247
1248    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1249        // Only start routes where auto_startup() == true
1250        // Sort by startup_order() ascending before starting
1251        let route_ids: Vec<String> = {
1252            let mut pairs: Vec<_> = self
1253                .routes
1254                .iter()
1255                .filter(|(_, r)| r.definition.auto_startup())
1256                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1257                .collect();
1258            pairs.sort_by_key(|(_, order)| *order);
1259            pairs.into_iter().map(|(id, _)| id).collect()
1260        };
1261
1262        info!("Starting {} auto-startup routes", route_ids.len());
1263
1264        // Collect errors but continue starting remaining routes
1265        let mut errors: Vec<String> = Vec::new();
1266        for route_id in route_ids {
1267            if let Err(e) = self.start_route(&route_id).await {
1268                errors.push(format!("Route '{}': {}", route_id, e));
1269            }
1270        }
1271
1272        if !errors.is_empty() {
1273            return Err(CamelError::RouteError(format!(
1274                "Failed to start routes: {}",
1275                errors.join(", ")
1276            )));
1277        }
1278
1279        info!("All auto-startup routes started");
1280        Ok(())
1281    }
1282
1283    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1284        // Sort by startup_order descending (reverse order)
1285        let route_ids: Vec<String> = {
1286            let mut pairs: Vec<_> = self
1287                .routes
1288                .iter()
1289                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1290                .collect();
1291            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1292            pairs.into_iter().map(|(id, _)| id).collect()
1293        };
1294
1295        info!("Stopping {} routes", route_ids.len());
1296
1297        for route_id in route_ids {
1298            let _ = self.stop_route(&route_id).await;
1299        }
1300
1301        info!("All routes stopped");
1302        Ok(())
1303    }
1304}
1305
1306#[cfg(test)]
1307#[path = "route_controller_tests.rs"]
1308mod tests;