Skip to main content

camel_core/lifecycle/adapters/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::metrics::MetricsCollector;
17use camel_api::{
18    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
19    RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
20};
21use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
22use camel_endpoint::parse_uri;
23use camel_language_api::{Expression, Language, LanguageError, Predicate};
24use camel_processor::circuit_breaker::CircuitBreakerLayer;
25use camel_processor::error_handler::ErrorHandlerLayer;
26use camel_processor::script_mutator::ScriptMutator;
27use camel_processor::{ChoiceService, WhenClause};
28
29use crate::lifecycle::adapters::route_compiler::{compose_pipeline, compose_traced_pipeline};
30use crate::lifecycle::application::route_definition::{
31    BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
32};
33use crate::shared::components::domain::Registry;
34use crate::shared::observability::domain::{DetailLevel, TracerConfig};
35use arc_swap::ArcSwap;
36use camel_bean::BeanRegistry;
37
38/// Notification sent when a route crashes.
39///
40/// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
41/// to monitor and restart failed routes.
42#[derive(Debug, Clone)]
43pub struct CrashNotification {
44    /// The ID of the crashed route.
45    pub route_id: String,
46    /// The error that caused the crash.
47    pub error: String,
48}
49
50/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
51///
52/// # Safety
53///
54/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
55/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
56///
57/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
58///    (creates a new boxed service from the inner clone).
59/// 2. The clone is owned by the calling thread and never shared.
60/// 3. ArcSwap guarantees we only get & references (no &mut).
61///
62/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
63/// clone() does not mutate shared state and each thread gets an independent copy.
64pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
65unsafe impl Sync for SyncBoxProcessor {}
66
67type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
68pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
69
70/// Internal trait extending [`RouteController`] with methods needed by [`CamelContext`]
71/// that are not part of the public lifecycle API.
72///
73/// Both [`DefaultRouteController`] and the future `SupervisingRouteController` implement
74/// this trait, allowing `CamelContext` to hold either as `Arc<Mutex<dyn RouteControllerInternal>>`.
75#[async_trait::async_trait]
76pub trait RouteControllerInternal: RouteController + Send {
77    /// Add a route definition to the controller.
78    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
79
80    /// Atomically swap the pipeline of a running route (for hot-reload).
81    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
82
83    /// Returns the `from_uri` of a route by ID.
84    fn route_from_uri(&self, route_id: &str) -> Option<String>;
85
86    /// Set a global error handler applied to all routes.
87    fn set_error_handler(&mut self, config: ErrorHandlerConfig);
88
89    /// Set the self-reference needed to create `ProducerContext`.
90    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
91
92    /// Set runtime handle for ProducerContext command/query access.
93    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
94
95    /// Returns the number of routes in the controller.
96    fn route_count(&self) -> usize;
97
98    /// Returns all route IDs.
99    fn route_ids(&self) -> Vec<String>;
100
101    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
102    fn auto_startup_route_ids(&self) -> Vec<String>;
103
104    /// Returns route IDs sorted by shutdown order (startup order descending).
105    fn shutdown_route_ids(&self) -> Vec<String>;
106
107    /// Configure tracing from a [`TracerConfig`].
108    fn set_tracer_config(&mut self, config: &TracerConfig);
109
110    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting it into the route map.
111    /// Used by hot-reload to prepare a new pipeline for atomic swap.
112    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
113
114    /// Remove a route from the controller map (route must be stopped first).
115    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
116
117    /// Start a route by ID (for use by hot-reload, where async_trait is required).
118    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
119
120    /// Stop a route by ID (for use by hot-reload, where async_trait is required).
121    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
122}
123
124/// Internal state for a managed route.
125struct ManagedRoute {
126    /// The route definition metadata (for introspection).
127    definition: RouteDefinitionInfo,
128    /// Source endpoint URI.
129    from_uri: String,
130    /// Resolved processor pipeline (wrapped for atomic swap).
131    pipeline: SharedPipeline,
132    /// Concurrency model override (if any).
133    concurrency: Option<ConcurrencyModel>,
134    /// Handle for the consumer task (if running).
135    consumer_handle: Option<JoinHandle<()>>,
136    /// Handle for the pipeline task (if running).
137    pipeline_handle: Option<JoinHandle<()>>,
138    /// Cancellation token for stopping the consumer task.
139    /// This allows independent control of the consumer lifecycle (for suspend/resume).
140    consumer_cancel_token: CancellationToken,
141    /// Cancellation token for stopping the pipeline task.
142    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
143    pipeline_cancel_token: CancellationToken,
144    /// Channel sender for sending exchanges to the pipeline.
145    /// Stored to allow resuming a suspended route without recreating the channel.
146    channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
147}
148
149fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
150    handle.as_ref().is_some_and(|h| !h.is_finished())
151}
152
153fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
154    match (
155        handle_is_running(&managed.consumer_handle),
156        handle_is_running(&managed.pipeline_handle),
157    ) {
158        (true, true) => "Started",
159        (false, true) => "Suspended",
160        (true, false) => "Stopping",
161        (false, false) => "Stopped",
162    }
163}
164
165/// Wait for a pipeline service to be ready with circuit breaker backoff.
166///
167/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
168/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
169/// cancellation checks. It returns `Ok(())` when the service is ready, or
170/// `Err(e)` if cancellation occurred or a fatal error was encountered.
171async fn ready_with_backoff(
172    pipeline: &mut BoxProcessor,
173    cancel: &CancellationToken,
174) -> Result<(), CamelError> {
175    loop {
176        match pipeline.ready().await {
177            Ok(_) => return Ok(()),
178            Err(CamelError::CircuitOpen(ref msg)) => {
179                warn!("Circuit open, backing off: {msg}");
180                tokio::select! {
181                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
182                        continue;
183                    }
184                    _ = cancel.cancelled() => {
185                        // Shutting down — don't retry.
186                        return Err(CamelError::CircuitOpen(msg.clone()));
187                    }
188                }
189            }
190            Err(e) => {
191                error!("Pipeline not ready: {e}");
192                return Err(e);
193            }
194        }
195    }
196}
197
198fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
199    let stamp = std::time::SystemTime::now()
200        .duration_since(std::time::UNIX_EPOCH)
201        .unwrap_or_default()
202        .as_nanos();
203    RuntimeCommand::FailRoute {
204        route_id: route_id.to_string(),
205        error: error.to_string(),
206        command_id: format!("ctrl-fail-{route_id}-{stamp}"),
207        causation_id: None,
208    }
209}
210
211async fn publish_runtime_failure(
212    runtime: Option<Weak<dyn RuntimeHandle>>,
213    route_id: &str,
214    error: &str,
215) {
216    let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
217        return;
218    };
219    let command = runtime_failure_command(route_id, error);
220    if let Err(runtime_error) = runtime.execute(command).await {
221        warn!(
222            route_id = %route_id,
223            error = %runtime_error,
224            "failed to synchronize route crash with runtime projection"
225        );
226    }
227}
228
229/// Default implementation of [`RouteController`].
230///
231/// Manages route lifecycle with support for:
232/// - Starting/stopping individual routes
233/// - Suspending and resuming routes
234/// - Auto-startup with startup ordering
235/// - Graceful shutdown
236pub struct DefaultRouteController {
237    /// Routes indexed by route ID.
238    routes: HashMap<String, ManagedRoute>,
239    /// Reference to the component registry for resolving endpoints.
240    registry: Arc<std::sync::Mutex<Registry>>,
241    /// Shared language registry for resolving declarative language expressions.
242    languages: SharedLanguageRegistry,
243    /// Bean registry for bean method invocation.
244    beans: Arc<std::sync::Mutex<BeanRegistry>>,
245    /// Self-reference for creating ProducerContext.
246    /// Set after construction via `set_self_ref()`.
247    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
248    /// Runtime handle injected into ProducerContext for command/query operations.
249    runtime: Option<Weak<dyn RuntimeHandle>>,
250    /// Optional global error handler applied to all routes without a per-route handler.
251    global_error_handler: Option<ErrorHandlerConfig>,
252    /// Optional crash notifier for supervision.
253    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
254    /// Whether tracing is enabled for route pipelines.
255    tracing_enabled: bool,
256    /// Detail level for tracing when enabled.
257    tracer_detail_level: DetailLevel,
258    /// Metrics collector for tracing processor.
259    tracer_metrics: Option<Arc<dyn MetricsCollector>>,
260}
261
262impl DefaultRouteController {
263    /// Create a new `DefaultRouteController` with the given registry.
264    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
265        Self::with_beans(
266            registry,
267            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
268        )
269    }
270
271    /// Create a new `DefaultRouteController` with shared bean registry.
272    pub fn with_beans(
273        registry: Arc<std::sync::Mutex<Registry>>,
274        beans: Arc<std::sync::Mutex<BeanRegistry>>,
275    ) -> Self {
276        Self {
277            routes: HashMap::new(),
278            registry,
279            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
280            beans,
281            self_ref: None,
282            runtime: None,
283            global_error_handler: None,
284            crash_notifier: None,
285            tracing_enabled: false,
286            tracer_detail_level: DetailLevel::Minimal,
287            tracer_metrics: None,
288        }
289    }
290
291    /// Create a new `DefaultRouteController` with shared language registry.
292    pub fn with_languages(
293        registry: Arc<std::sync::Mutex<Registry>>,
294        languages: SharedLanguageRegistry,
295    ) -> Self {
296        Self {
297            routes: HashMap::new(),
298            registry,
299            languages,
300            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
301            self_ref: None,
302            runtime: None,
303            global_error_handler: None,
304            crash_notifier: None,
305            tracing_enabled: false,
306            tracer_detail_level: DetailLevel::Minimal,
307            tracer_metrics: None,
308        }
309    }
310
311    /// Set the self-reference for creating ProducerContext.
312    ///
313    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
314    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
315        self.self_ref = Some(self_ref);
316    }
317
318    /// Set runtime handle for ProducerContext creation.
319    pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
320        self.runtime = Some(Arc::downgrade(&runtime));
321    }
322
323    /// Get the self-reference, if set.
324    ///
325    /// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
326    /// to spawn the supervision loop.
327    pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
328        self.self_ref.clone()
329    }
330
331    /// Get runtime handle for supervision-triggered lifecycle commands, if set.
332    pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
333        self.runtime.as_ref().and_then(Weak::upgrade)
334    }
335
336    /// Set the crash notifier for supervision.
337    ///
338    /// When set, the controller will send a [`CrashNotification`] whenever
339    /// a consumer crashes.
340    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
341        self.crash_notifier = Some(tx);
342    }
343
344    /// Set a global error handler applied to all routes without a per-route handler.
345    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
346        self.global_error_handler = Some(config);
347    }
348
349    /// Configure tracing for this route controller.
350    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
351        self.tracing_enabled = config.enabled;
352        self.tracer_detail_level = config.detail_level.clone();
353        self.tracer_metrics = config.metrics_collector.clone();
354    }
355
356    fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
357        let mut producer_ctx = ProducerContext::new();
358        if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
359            producer_ctx = producer_ctx.with_runtime(runtime);
360        }
361        Ok(producer_ctx)
362    }
363
364    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
365    fn resolve_error_handler(
366        &self,
367        config: ErrorHandlerConfig,
368        producer_ctx: &ProducerContext,
369        registry: &Registry,
370    ) -> Result<ErrorHandlerLayer, CamelError> {
371        // Resolve DLC URI → producer.
372        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
373            let parsed = parse_uri(uri)?;
374            let component = registry.get_or_err(&parsed.scheme)?;
375            let endpoint = component.create_endpoint(uri)?;
376            Some(endpoint.create_producer(producer_ctx)?)
377        } else {
378            None
379        };
380
381        // Resolve per-policy `handled_by` URIs.
382        let mut resolved_policies = Vec::new();
383        for policy in config.policies {
384            let handler_producer = if let Some(ref uri) = policy.handled_by {
385                let parsed = parse_uri(uri)?;
386                let component = registry.get_or_err(&parsed.scheme)?;
387                let endpoint = component.create_endpoint(uri)?;
388                Some(endpoint.create_producer(producer_ctx)?)
389            } else {
390                None
391            };
392            resolved_policies.push((policy, handler_producer));
393        }
394
395        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
396    }
397
398    fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
399        let guard = self
400            .languages
401            .lock()
402            .expect("mutex poisoned: another thread panicked while holding this lock");
403        guard.get(language).cloned().ok_or_else(|| {
404            CamelError::RouteError(format!(
405                "language `{language}` is not registered in CamelContext"
406            ))
407        })
408    }
409
410    fn compile_language_expression(
411        &self,
412        expression: &LanguageExpressionDef,
413    ) -> Result<Arc<dyn Expression>, CamelError> {
414        let language = self.resolve_language(&expression.language)?;
415        let compiled = language
416            .create_expression(&expression.source)
417            .map_err(|e| {
418                CamelError::RouteError(format!(
419                    "failed to compile {} expression `{}`: {e}",
420                    expression.language, expression.source
421                ))
422            })?;
423        Ok(Arc::from(compiled))
424    }
425
426    fn compile_language_predicate(
427        &self,
428        expression: &LanguageExpressionDef,
429    ) -> Result<Arc<dyn Predicate>, CamelError> {
430        let language = self.resolve_language(&expression.language)?;
431        let compiled = language.create_predicate(&expression.source).map_err(|e| {
432            CamelError::RouteError(format!(
433                "failed to compile {} predicate `{}`: {e}",
434                expression.language, expression.source
435            ))
436        })?;
437        Ok(Arc::from(compiled))
438    }
439
440    fn compile_filter_predicate(
441        &self,
442        expression: &LanguageExpressionDef,
443    ) -> Result<FilterPredicate, CamelError> {
444        let predicate = self.compile_language_predicate(expression)?;
445        Ok(Arc::new(move |exchange: &Exchange| {
446            predicate.matches(exchange).unwrap_or(false)
447        }))
448    }
449
450    fn value_to_body(value: Value) -> Body {
451        match value {
452            Value::Null => Body::Empty,
453            Value::String(text) => Body::Text(text),
454            other => Body::Json(other),
455        }
456    }
457
458    /// Resolve BuilderSteps into BoxProcessors.
459    pub(crate) fn resolve_steps(
460        &self,
461        steps: Vec<BuilderStep>,
462        producer_ctx: &ProducerContext,
463        registry: Arc<std::sync::Mutex<Registry>>,
464    ) -> Result<Vec<BoxProcessor>, CamelError> {
465        let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
466            let parsed = parse_uri(uri)?;
467            let registry_guard = registry
468                .lock()
469                .expect("mutex poisoned: another thread panicked while holding this lock");
470            let component = registry_guard.get_or_err(&parsed.scheme)?;
471            let endpoint = component.create_endpoint(uri)?;
472            endpoint.create_producer(producer_ctx)
473        };
474
475        let mut processors: Vec<BoxProcessor> = Vec::new();
476        for step in steps {
477            match step {
478                BuilderStep::Processor(svc) => {
479                    processors.push(svc);
480                }
481                BuilderStep::To(uri) => {
482                    let producer = resolve_producer(&uri)?;
483                    processors.push(producer);
484                }
485                BuilderStep::Stop => {
486                    processors.push(BoxProcessor::new(camel_processor::StopService));
487                }
488                BuilderStep::Log { level, message } => {
489                    let svc = camel_processor::LogProcessor::new(level, message);
490                    processors.push(BoxProcessor::new(svc));
491                }
492                BuilderStep::DeclarativeSetHeader { key, value } => match value {
493                    ValueSourceDef::Literal(value) => {
494                        let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
495                        processors.push(BoxProcessor::new(svc));
496                    }
497                    ValueSourceDef::Expression(expression) => {
498                        let expression = self.compile_language_expression(&expression)?;
499                        let svc = camel_processor::DynamicSetHeader::new(
500                            IdentityProcessor,
501                            key,
502                            move |exchange: &Exchange| {
503                                expression.evaluate(exchange).unwrap_or(Value::Null)
504                            },
505                        );
506                        processors.push(BoxProcessor::new(svc));
507                    }
508                },
509                BuilderStep::DeclarativeSetBody { value } => match value {
510                    ValueSourceDef::Literal(value) => {
511                        let body = Self::value_to_body(value);
512                        let svc = camel_processor::SetBody::new(
513                            IdentityProcessor,
514                            move |_exchange: &Exchange| body.clone(),
515                        );
516                        processors.push(BoxProcessor::new(svc));
517                    }
518                    ValueSourceDef::Expression(expression) => {
519                        let expression = self.compile_language_expression(&expression)?;
520                        let svc = camel_processor::SetBody::new(
521                            IdentityProcessor,
522                            move |exchange: &Exchange| {
523                                let value = expression.evaluate(exchange).unwrap_or(Value::Null);
524                                Self::value_to_body(value)
525                            },
526                        );
527                        processors.push(BoxProcessor::new(svc));
528                    }
529                },
530                BuilderStep::DeclarativeFilter { predicate, steps } => {
531                    let predicate = self.compile_filter_predicate(&predicate)?;
532                    let sub_processors =
533                        self.resolve_steps(steps, producer_ctx, registry.clone())?;
534                    let sub_pipeline = compose_pipeline(sub_processors);
535                    let svc =
536                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
537                    processors.push(BoxProcessor::new(svc));
538                }
539                BuilderStep::DeclarativeChoice { whens, otherwise } => {
540                    let mut when_clauses = Vec::new();
541                    for when_step in whens {
542                        let predicate = self.compile_filter_predicate(&when_step.predicate)?;
543                        let sub_processors =
544                            self.resolve_steps(when_step.steps, producer_ctx, registry.clone())?;
545                        let pipeline = compose_pipeline(sub_processors);
546                        when_clauses.push(WhenClause {
547                            predicate,
548                            pipeline,
549                        });
550                    }
551                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
552                        let sub_processors =
553                            self.resolve_steps(otherwise_steps, producer_ctx, registry.clone())?;
554                        Some(compose_pipeline(sub_processors))
555                    } else {
556                        None
557                    };
558                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
559                    processors.push(BoxProcessor::new(svc));
560                }
561                BuilderStep::DeclarativeScript { expression } => {
562                    let expression = self.compile_language_expression(&expression)?;
563                    let svc = camel_processor::SetBody::new(
564                        IdentityProcessor,
565                        move |exchange: &Exchange| {
566                            let value = expression.evaluate(exchange).unwrap_or(Value::Null);
567                            Self::value_to_body(value)
568                        },
569                    );
570                    processors.push(BoxProcessor::new(svc));
571                }
572                BuilderStep::Split { config, steps } => {
573                    let sub_processors =
574                        self.resolve_steps(steps, producer_ctx, registry.clone())?;
575                    let sub_pipeline = compose_pipeline(sub_processors);
576                    let splitter =
577                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
578                    processors.push(BoxProcessor::new(splitter));
579                }
580                BuilderStep::DeclarativeSplit {
581                    expression,
582                    aggregation,
583                    parallel,
584                    parallel_limit,
585                    stop_on_exception,
586                    steps,
587                } => {
588                    let lang_expr = self.compile_language_expression(&expression)?;
589                    let split_fn = move |exchange: &Exchange| {
590                        let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
591                        match value {
592                            Value::String(s) => s
593                                .lines()
594                                .filter(|line| !line.is_empty())
595                                .map(|line| {
596                                    let mut fragment = exchange.clone();
597                                    fragment.input.body = Body::from(line.to_string());
598                                    fragment
599                                })
600                                .collect(),
601                            Value::Array(arr) => arr
602                                .into_iter()
603                                .map(|v| {
604                                    let mut fragment = exchange.clone();
605                                    fragment.input.body = Body::from(v);
606                                    fragment
607                                })
608                                .collect(),
609                            _ => vec![exchange.clone()],
610                        }
611                    };
612
613                    let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
614                        .aggregation(aggregation)
615                        .parallel(parallel)
616                        .stop_on_exception(stop_on_exception);
617                    if let Some(limit) = parallel_limit {
618                        config = config.parallel_limit(limit);
619                    }
620
621                    let sub_processors =
622                        self.resolve_steps(steps, producer_ctx, registry.clone())?;
623                    let sub_pipeline = compose_pipeline(sub_processors);
624                    let splitter =
625                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
626                    processors.push(BoxProcessor::new(splitter));
627                }
628                BuilderStep::Aggregate { config } => {
629                    let svc = camel_processor::AggregatorService::new(config);
630                    processors.push(BoxProcessor::new(svc));
631                }
632                BuilderStep::Filter { predicate, steps } => {
633                    let sub_processors =
634                        self.resolve_steps(steps, producer_ctx, registry.clone())?;
635                    let sub_pipeline = compose_pipeline(sub_processors);
636                    let svc =
637                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
638                    processors.push(BoxProcessor::new(svc));
639                }
640                BuilderStep::Choice { whens, otherwise } => {
641                    // Resolve each when clause's sub-steps into a pipeline.
642                    let mut when_clauses = Vec::new();
643                    for when_step in whens {
644                        let sub_processors =
645                            self.resolve_steps(when_step.steps, producer_ctx, registry.clone())?;
646                        let pipeline = compose_pipeline(sub_processors);
647                        when_clauses.push(WhenClause {
648                            predicate: when_step.predicate,
649                            pipeline,
650                        });
651                    }
652                    // Resolve otherwise branch (if present).
653                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
654                        let sub_processors =
655                            self.resolve_steps(otherwise_steps, producer_ctx, registry.clone())?;
656                        Some(compose_pipeline(sub_processors))
657                    } else {
658                        None
659                    };
660                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
661                    processors.push(BoxProcessor::new(svc));
662                }
663                BuilderStep::WireTap { uri } => {
664                    let producer = resolve_producer(&uri)?;
665                    let svc = camel_processor::WireTapService::new(producer);
666                    processors.push(BoxProcessor::new(svc));
667                }
668                BuilderStep::Multicast { config, steps } => {
669                    // Each top-level step in the multicast scope becomes an independent endpoint.
670                    let mut endpoints = Vec::new();
671                    for step in steps {
672                        let sub_processors =
673                            self.resolve_steps(vec![step], producer_ctx, registry.clone())?;
674                        let endpoint = compose_pipeline(sub_processors);
675                        endpoints.push(endpoint);
676                    }
677                    let svc = camel_processor::MulticastService::new(endpoints, config);
678                    processors.push(BoxProcessor::new(svc));
679                }
680                BuilderStep::DeclarativeLog { level, message } => {
681                    let ValueSourceDef::Expression(expression) = message else {
682                        // Literal case is already converted to a Processor in compile.rs;
683                        // this arm should never be reached for literals.
684                        unreachable!(
685                            "DeclarativeLog with Literal should have been compiled to a Processor"
686                        );
687                    };
688                    let expression = self.compile_language_expression(&expression)?;
689                    let svc =
690                        camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
691                            expression
692                                .evaluate(exchange)
693                                .unwrap_or_else(|e| {
694                                    warn!(error = %e, "log expression evaluation failed");
695                                    Value::Null
696                                })
697                                .to_string()
698                        });
699                    processors.push(BoxProcessor::new(svc));
700                }
701                BuilderStep::Bean { name, method } => {
702                    // Lock beans registry to lookup bean
703                    let beans = self.beans.lock().expect(
704                        "beans mutex poisoned: another thread panicked while holding this lock",
705                    );
706
707                    // Lookup bean by name
708                    let bean = beans.get(&name).ok_or_else(|| {
709                        CamelError::ProcessorError(format!("Bean not found: {}", name))
710                    })?;
711
712                    // Clone Arc for async closure (release lock before async)
713                    let bean_clone = Arc::clone(&bean);
714                    let method = method.clone();
715
716                    // Create processor that invokes bean method
717                    let processor = tower::service_fn(move |mut exchange: Exchange| {
718                        let bean = Arc::clone(&bean_clone);
719                        let method = method.clone();
720
721                        async move {
722                            bean.call(&method, &mut exchange).await?;
723                            Ok(exchange)
724                        }
725                    });
726
727                    processors.push(BoxProcessor::new(processor));
728                }
729                BuilderStep::Script { language, script } => {
730                    let lang = self.resolve_language(&language)?;
731                    match lang.create_mutating_expression(&script) {
732                        Ok(mut_expr) => {
733                            processors.push(BoxProcessor::new(ScriptMutator::new(mut_expr)));
734                        }
735                        Err(LanguageError::NotSupported {
736                            feature,
737                            language: ref lang_name,
738                        }) => {
739                            return Err(CamelError::RouteError(format!(
740                                "Language '{}' does not support {} (required for .script() step)",
741                                lang_name, feature
742                            )));
743                        }
744                        Err(e) => {
745                            return Err(CamelError::RouteError(format!(
746                                "Failed to create mutating expression for language '{}': {}",
747                                language, e
748                            )));
749                        }
750                    }
751                }
752                BuilderStep::Throttle { config, steps } => {
753                    let sub_processors =
754                        self.resolve_steps(steps, producer_ctx, registry.clone())?;
755                    let sub_pipeline = compose_pipeline(sub_processors);
756                    let svc =
757                        camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
758                    processors.push(BoxProcessor::new(svc));
759                }
760                BuilderStep::LoadBalance { config, steps } => {
761                    // Each top-level step in the load_balance scope becomes an independent endpoint.
762                    let mut endpoints = Vec::new();
763                    for step in steps {
764                        let sub_processors =
765                            self.resolve_steps(vec![step], producer_ctx, registry.clone())?;
766                        let endpoint = compose_pipeline(sub_processors);
767                        endpoints.push(endpoint);
768                    }
769                    let svc =
770                        camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
771                    processors.push(BoxProcessor::new(svc));
772                }
773                BuilderStep::DynamicRouter { config } => {
774                    use camel_processor::dynamic_router::EndpointResolver;
775
776                    let producer_ctx_clone = producer_ctx.clone();
777                    let registry_clone = registry.clone();
778                    let resolver: EndpointResolver = Arc::new(move |uri: &str| {
779                        let parsed = match parse_uri(uri) {
780                            Ok(p) => p,
781                            Err(_) => return None,
782                        };
783                        let registry_guard = match registry_clone.lock() {
784                            Ok(g) => g,
785                            Err(_) => return None, // mutex poisoned
786                        };
787                        let component = match registry_guard.get_or_err(&parsed.scheme) {
788                            Ok(c) => c,
789                            Err(_) => return None,
790                        };
791                        let endpoint = match component.create_endpoint(uri) {
792                            Ok(e) => e,
793                            Err(_) => return None,
794                        };
795                        let producer = match endpoint.create_producer(&producer_ctx_clone) {
796                            Ok(p) => p,
797                            Err(_) => return None,
798                        };
799                        Some(BoxProcessor::new(producer))
800                    });
801                    let svc = camel_processor::dynamic_router::DynamicRouterService::new(
802                        config, resolver,
803                    );
804                    processors.push(BoxProcessor::new(svc));
805                }
806            }
807        }
808        Ok(processors)
809    }
810
811    /// Add a route definition to the controller.
812    ///
813    /// Steps are resolved immediately using the registry.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if:
818    /// - A route with the same ID already exists
819    /// - Step resolution fails
820    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
821        let route_id = definition.route_id().to_string();
822
823        if self.routes.contains_key(&route_id) {
824            return Err(CamelError::RouteError(format!(
825                "Route '{}' already exists",
826                route_id
827            )));
828        }
829
830        info!(route_id = %route_id, "Adding route to controller");
831
832        // Extract definition info for storage before steps are consumed
833        let definition_info = definition.to_info();
834        let from_uri = definition.from_uri.to_string();
835        let concurrency = definition.concurrency;
836
837        // Create ProducerContext from self_ref for step resolution
838        let producer_ctx = self.build_producer_context()?;
839
840        // Resolve steps into processors (takes ownership of steps)
841        let processors =
842            self.resolve_steps(definition.steps, &producer_ctx, self.registry.clone())?;
843        let route_id_for_tracing = route_id.clone();
844        let mut pipeline = compose_traced_pipeline(
845            processors,
846            &route_id_for_tracing,
847            self.tracing_enabled,
848            self.tracer_detail_level.clone(),
849            self.tracer_metrics.clone(),
850        );
851
852        // Apply circuit breaker if configured
853        if let Some(cb_config) = definition.circuit_breaker {
854            let cb_layer = CircuitBreakerLayer::new(cb_config);
855            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
856        }
857
858        // Determine which error handler config to use (per-route takes precedence)
859        let eh_config = definition
860            .error_handler
861            .or_else(|| self.global_error_handler.clone());
862
863        if let Some(config) = eh_config {
864            // Lock registry for error handler resolution
865            let registry = self
866                .registry
867                .lock()
868                .expect("mutex poisoned: another thread panicked while holding this lock");
869            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
870            pipeline = BoxProcessor::new(layer.layer(pipeline));
871        }
872
873        self.routes.insert(
874            route_id.clone(),
875            ManagedRoute {
876                definition: definition_info,
877                from_uri,
878                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
879                concurrency,
880                consumer_handle: None,
881                pipeline_handle: None,
882                consumer_cancel_token: CancellationToken::new(),
883                pipeline_cancel_token: CancellationToken::new(),
884                channel_sender: None,
885            },
886        );
887
888        Ok(())
889    }
890
891    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
892    ///
893    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
894    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
895    pub fn compile_route_definition(
896        &self,
897        def: RouteDefinition,
898    ) -> Result<BoxProcessor, CamelError> {
899        let route_id = def.route_id().to_string();
900
901        let producer_ctx = self.build_producer_context()?;
902
903        let processors = self.resolve_steps(def.steps, &producer_ctx, self.registry.clone())?;
904        let mut pipeline = compose_traced_pipeline(
905            processors,
906            &route_id,
907            self.tracing_enabled,
908            self.tracer_detail_level.clone(),
909            self.tracer_metrics.clone(),
910        );
911
912        if let Some(cb_config) = def.circuit_breaker {
913            let cb_layer = CircuitBreakerLayer::new(cb_config);
914            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
915        }
916
917        let eh_config = def
918            .error_handler
919            .or_else(|| self.global_error_handler.clone());
920        if let Some(config) = eh_config {
921            // Lock registry for error handler resolution
922            let registry = self
923                .registry
924                .lock()
925                .expect("mutex poisoned: registry lock in compile_route_definition");
926            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
927            pipeline = BoxProcessor::new(layer.layer(pipeline));
928        }
929
930        Ok(pipeline)
931    }
932
933    /// Remove a route from the controller map.
934    ///
935    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
936    /// Returns an error if the route is still running or does not exist.
937    /// Does not cancel any running tasks — call `stop_route` first.
938    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
939        let managed = self.routes.get(route_id).ok_or_else(|| {
940            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
941        })?;
942        if handle_is_running(&managed.consumer_handle)
943            || handle_is_running(&managed.pipeline_handle)
944        {
945            return Err(CamelError::RouteError(format!(
946                "Route '{}' must be stopped before removal (current execution lifecycle: {})",
947                route_id,
948                inferred_lifecycle_label(managed)
949            )));
950        }
951        self.routes.remove(route_id);
952        info!(route_id = %route_id, "Route removed from controller");
953        Ok(())
954    }
955
956    /// Returns the number of routes in the controller.
957    pub fn route_count(&self) -> usize {
958        self.routes.len()
959    }
960
961    /// Returns all route IDs.
962    pub fn route_ids(&self) -> Vec<String> {
963        self.routes.keys().cloned().collect()
964    }
965
966    /// Returns route IDs that should auto-start, sorted by startup order (ascending).
967    pub fn auto_startup_route_ids(&self) -> Vec<String> {
968        let mut pairs: Vec<(String, i32)> = self
969            .routes
970            .iter()
971            .filter(|(_, managed)| managed.definition.auto_startup())
972            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
973            .collect();
974        pairs.sort_by_key(|(_, order)| *order);
975        pairs.into_iter().map(|(id, _)| id).collect()
976    }
977
978    /// Returns route IDs sorted by shutdown order (startup order descending).
979    pub fn shutdown_route_ids(&self) -> Vec<String> {
980        let mut pairs: Vec<(String, i32)> = self
981            .routes
982            .iter()
983            .map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
984            .collect();
985        pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
986        pairs.into_iter().map(|(id, _)| id).collect()
987    }
988
989    /// Atomically swap the pipeline of a route.
990    ///
991    /// In-flight requests finish with the old pipeline (kept alive by Arc).
992    /// New requests immediately use the new pipeline.
993    pub fn swap_pipeline(
994        &self,
995        route_id: &str,
996        new_pipeline: BoxProcessor,
997    ) -> Result<(), CamelError> {
998        let managed = self
999            .routes
1000            .get(route_id)
1001            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1002
1003        managed
1004            .pipeline
1005            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
1006        info!(route_id = %route_id, "Pipeline swapped atomically");
1007        Ok(())
1008    }
1009
1010    /// Returns the from_uri of a route, if it exists.
1011    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
1012        self.routes.get(route_id).map(|r| r.from_uri.clone())
1013    }
1014
1015    /// Get a clone of the current pipeline for a route.
1016    ///
1017    /// This is useful for testing and introspection.
1018    /// Returns `None` if the route doesn't exist.
1019    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
1020        self.routes
1021            .get(route_id)
1022            .map(|r| r.pipeline.load().0.clone())
1023    }
1024
1025    /// Internal stop implementation that can set custom status.
1026    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
1027        let managed = self
1028            .routes
1029            .get_mut(route_id)
1030            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1031
1032        if !handle_is_running(&managed.consumer_handle)
1033            && !handle_is_running(&managed.pipeline_handle)
1034        {
1035            return Ok(());
1036        }
1037
1038        info!(route_id = %route_id, "Stopping route");
1039
1040        // Cancel both tokens to signal shutdown for consumer and pipeline independently
1041        let managed = self
1042            .routes
1043            .get_mut(route_id)
1044            .expect("invariant: route must exist after prior existence check");
1045        managed.consumer_cancel_token.cancel();
1046        managed.pipeline_cancel_token.cancel();
1047
1048        // Take handles directly (no Arc<Mutex> wrapper needed)
1049        let managed = self
1050            .routes
1051            .get_mut(route_id)
1052            .expect("invariant: route must exist after prior existence check");
1053        let consumer_handle = managed.consumer_handle.take();
1054        let pipeline_handle = managed.pipeline_handle.take();
1055
1056        // IMPORTANT: Drop channel_sender early so rx.recv() returns None
1057        // This ensures the pipeline task can exit even if idle on recv()
1058        let managed = self
1059            .routes
1060            .get_mut(route_id)
1061            .expect("invariant: route must exist after prior existence check");
1062        managed.channel_sender = None;
1063
1064        // Wait for tasks to complete with timeout
1065        // The CancellationToken already signaled tasks to stop gracefully.
1066        // Combined with the select! in pipeline loops, this should exit quickly.
1067        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1068            match (consumer_handle, pipeline_handle) {
1069                (Some(c), Some(p)) => {
1070                    let _ = tokio::join!(c, p);
1071                }
1072                (Some(c), None) => {
1073                    let _ = c.await;
1074                }
1075                (None, Some(p)) => {
1076                    let _ = p.await;
1077                }
1078                (None, None) => {}
1079            }
1080        })
1081        .await;
1082
1083        if timeout_result.is_err() {
1084            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
1085        }
1086
1087        // Get the managed route again (can't hold across await)
1088        let managed = self
1089            .routes
1090            .get_mut(route_id)
1091            .expect("invariant: route must exist after prior existence check");
1092
1093        // Create fresh cancellation tokens for next start
1094        managed.consumer_cancel_token = CancellationToken::new();
1095        managed.pipeline_cancel_token = CancellationToken::new();
1096
1097        info!(route_id = %route_id, "Route stopped");
1098        Ok(())
1099    }
1100}
1101
1102#[async_trait::async_trait]
1103impl RouteController for DefaultRouteController {
1104    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1105        // Check if route exists and can be started.
1106        {
1107            let managed = self
1108                .routes
1109                .get_mut(route_id)
1110                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1111
1112            let consumer_running = handle_is_running(&managed.consumer_handle);
1113            let pipeline_running = handle_is_running(&managed.pipeline_handle);
1114            if consumer_running && pipeline_running {
1115                return Ok(());
1116            }
1117            if !consumer_running && pipeline_running {
1118                return Err(CamelError::RouteError(format!(
1119                    "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
1120                    route_id
1121                )));
1122            }
1123            if consumer_running && !pipeline_running {
1124                return Err(CamelError::RouteError(format!(
1125                    "Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
1126                    route_id
1127                )));
1128            }
1129        }
1130
1131        info!(route_id = %route_id, "Starting route");
1132
1133        // Get the resolved route info
1134        let (from_uri, pipeline, concurrency) = {
1135            let managed = self
1136                .routes
1137                .get(route_id)
1138                .expect("invariant: route must exist after prior existence check");
1139            (
1140                managed.from_uri.clone(),
1141                Arc::clone(&managed.pipeline),
1142                managed.concurrency.clone(),
1143            )
1144        };
1145
1146        // Clone crash notifier for consumer task
1147        let crash_notifier = self.crash_notifier.clone();
1148        let runtime_for_consumer = self.runtime.clone();
1149
1150        // Parse from URI and create consumer (lock registry for lookup)
1151        let parsed = parse_uri(&from_uri)?;
1152        let registry = self
1153            .registry
1154            .lock()
1155            .expect("mutex poisoned: another thread panicked while holding this lock");
1156        let component = registry.get_or_err(&parsed.scheme)?;
1157        let endpoint = component.create_endpoint(&from_uri)?;
1158        let mut consumer = endpoint.create_consumer()?;
1159        let consumer_concurrency = consumer.concurrency_model();
1160        // Drop the lock before spawning tasks
1161        drop(registry);
1162
1163        // Resolve effective concurrency: route override > consumer default
1164        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1165
1166        // Get the managed route for mutation
1167        let managed = self
1168            .routes
1169            .get_mut(route_id)
1170            .expect("invariant: route must exist after prior existence check");
1171
1172        // Create channel for consumer to send exchanges
1173        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1174        // Create child tokens for independent lifecycle control
1175        let consumer_cancel = managed.consumer_cancel_token.child_token();
1176        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1177        // Clone sender for storage (to reuse on resume)
1178        let tx_for_storage = tx.clone();
1179        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1180
1181        // Start consumer in background task.
1182        let route_id_for_consumer = route_id.to_string();
1183        let consumer_handle = tokio::spawn(async move {
1184            if let Err(e) = consumer.start(consumer_ctx).await {
1185                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1186                let error_msg = e.to_string();
1187
1188                // Send crash notification if notifier is configured
1189                if let Some(tx) = crash_notifier {
1190                    let _ = tx
1191                        .send(CrashNotification {
1192                            route_id: route_id_for_consumer.clone(),
1193                            error: error_msg.clone(),
1194                        })
1195                        .await;
1196                }
1197
1198                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1199                    .await;
1200            }
1201        });
1202
1203        // Spawn pipeline task with its own cancellation token
1204        let pipeline_handle = match effective_concurrency {
1205            ConcurrencyModel::Sequential => {
1206                tokio::spawn(async move {
1207                    loop {
1208                        // Use select! to exit promptly on cancellation even when idle
1209                        let envelope = tokio::select! {
1210                            envelope = rx.recv() => match envelope {
1211                                Some(e) => e,
1212                                None => return, // Channel closed
1213                            },
1214                            _ = pipeline_cancel.cancelled() => {
1215                                // Cancellation requested - exit gracefully
1216                                return;
1217                            }
1218                        };
1219                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1220
1221                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1222                        let mut pipeline = pipeline.load().0.clone();
1223
1224                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1225                            if let Some(tx) = reply_tx {
1226                                let _ = tx.send(Err(e));
1227                            }
1228                            return;
1229                        }
1230
1231                        let result = pipeline.call(exchange).await;
1232                        if let Some(tx) = reply_tx {
1233                            let _ = tx.send(result);
1234                        } else if let Err(ref e) = result
1235                            && !matches!(e, CamelError::Stopped)
1236                        {
1237                            error!("Pipeline error: {e}");
1238                        }
1239                    }
1240                })
1241            }
1242            ConcurrencyModel::Concurrent { max } => {
1243                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1244                tokio::spawn(async move {
1245                    loop {
1246                        // Use select! to exit promptly on cancellation even when idle
1247                        let envelope = tokio::select! {
1248                            envelope = rx.recv() => match envelope {
1249                                Some(e) => e,
1250                                None => return, // Channel closed
1251                            },
1252                            _ = pipeline_cancel.cancelled() => {
1253                                // Cancellation requested - exit gracefully
1254                                return;
1255                            }
1256                        };
1257                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1258                        let pipe_ref = Arc::clone(&pipeline);
1259                        let sem = sem.clone();
1260                        let cancel = pipeline_cancel.clone();
1261                        tokio::spawn(async move {
1262                            // Acquire semaphore permit if bounded
1263                            let _permit = match &sem {
1264                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1265                                None => None,
1266                            };
1267
1268                            // Load current pipeline from ArcSwap
1269                            let mut pipe = pipe_ref.load().0.clone();
1270
1271                            // Wait for service ready with circuit breaker backoff
1272                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1273                                if let Some(tx) = reply_tx {
1274                                    let _ = tx.send(Err(e));
1275                                }
1276                                return;
1277                            }
1278
1279                            let result = pipe.call(exchange).await;
1280                            if let Some(tx) = reply_tx {
1281                                let _ = tx.send(result);
1282                            } else if let Err(ref e) = result
1283                                && !matches!(e, CamelError::Stopped)
1284                            {
1285                                error!("Pipeline error: {e}");
1286                            }
1287                        });
1288                    }
1289                })
1290            }
1291        };
1292
1293        // Store handles and update status
1294        let managed = self
1295            .routes
1296            .get_mut(route_id)
1297            .expect("invariant: route must exist after prior existence check");
1298        managed.consumer_handle = Some(consumer_handle);
1299        managed.pipeline_handle = Some(pipeline_handle);
1300        managed.channel_sender = Some(tx_for_storage);
1301
1302        info!(route_id = %route_id, "Route started");
1303        Ok(())
1304    }
1305
1306    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1307        self.stop_route_internal(route_id).await
1308    }
1309
1310    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1311        self.stop_route(route_id).await?;
1312        tokio::time::sleep(Duration::from_millis(100)).await;
1313        self.start_route(route_id).await
1314    }
1315
1316    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1317        // Check route exists and state.
1318        let managed = self
1319            .routes
1320            .get_mut(route_id)
1321            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1322
1323        let consumer_running = handle_is_running(&managed.consumer_handle);
1324        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1325
1326        // Can only suspend from active started state.
1327        if !consumer_running || !pipeline_running {
1328            return Err(CamelError::RouteError(format!(
1329                "Cannot suspend route '{}' with execution lifecycle {}",
1330                route_id,
1331                inferred_lifecycle_label(managed)
1332            )));
1333        }
1334
1335        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1336
1337        // Cancel consumer token only (keep pipeline running)
1338        let managed = self
1339            .routes
1340            .get_mut(route_id)
1341            .expect("invariant: route must exist after prior existence check");
1342        managed.consumer_cancel_token.cancel();
1343
1344        // Take and join consumer handle
1345        let managed = self
1346            .routes
1347            .get_mut(route_id)
1348            .expect("invariant: route must exist after prior existence check");
1349        let consumer_handle = managed.consumer_handle.take();
1350
1351        // Wait for consumer task to complete with timeout
1352        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1353            if let Some(handle) = consumer_handle {
1354                let _ = handle.await;
1355            }
1356        })
1357        .await;
1358
1359        if timeout_result.is_err() {
1360            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1361        }
1362
1363        // Get the managed route again (can't hold across await)
1364        let managed = self
1365            .routes
1366            .get_mut(route_id)
1367            .expect("invariant: route must exist after prior existence check");
1368
1369        // Create fresh cancellation token for consumer (for resume)
1370        managed.consumer_cancel_token = CancellationToken::new();
1371
1372        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1373        Ok(())
1374    }
1375
1376    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1377        // Check route exists and is Suspended-equivalent execution state.
1378        let managed = self
1379            .routes
1380            .get(route_id)
1381            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1382
1383        let consumer_running = handle_is_running(&managed.consumer_handle);
1384        let pipeline_running = handle_is_running(&managed.pipeline_handle);
1385        if consumer_running || !pipeline_running {
1386            return Err(CamelError::RouteError(format!(
1387                "Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
1388                route_id,
1389                inferred_lifecycle_label(managed)
1390            )));
1391        }
1392
1393        // Get the stored channel sender (must exist for a suspended route)
1394        let sender = managed.channel_sender.clone().ok_or_else(|| {
1395            CamelError::RouteError("Suspended route has no channel sender".into())
1396        })?;
1397
1398        // Get from_uri and concurrency for creating new consumer
1399        let from_uri = managed.from_uri.clone();
1400
1401        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1402
1403        // Parse from URI and create consumer (lock registry for lookup)
1404        let parsed = parse_uri(&from_uri)?;
1405        let registry = self
1406            .registry
1407            .lock()
1408            .expect("mutex poisoned: another thread panicked while holding this lock");
1409        let component = registry.get_or_err(&parsed.scheme)?;
1410        let endpoint = component.create_endpoint(&from_uri)?;
1411        let mut consumer = endpoint.create_consumer()?;
1412        // Drop the lock before spawning tasks
1413        drop(registry);
1414
1415        // Get the managed route for mutation
1416        let managed = self
1417            .routes
1418            .get_mut(route_id)
1419            .expect("invariant: route must exist after prior existence check");
1420
1421        // Create child token for consumer lifecycle
1422        let consumer_cancel = managed.consumer_cancel_token.child_token();
1423
1424        let crash_notifier = self.crash_notifier.clone();
1425        let runtime_for_consumer = self.runtime.clone();
1426
1427        // Create ConsumerContext with the stored sender
1428        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1429
1430        // Spawn consumer task
1431        let route_id_for_consumer = route_id.to_string();
1432        let consumer_handle = tokio::spawn(async move {
1433            if let Err(e) = consumer.start(consumer_ctx).await {
1434                error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1435                let error_msg = e.to_string();
1436
1437                // Send crash notification if notifier is configured
1438                if let Some(tx) = crash_notifier {
1439                    let _ = tx
1440                        .send(CrashNotification {
1441                            route_id: route_id_for_consumer.clone(),
1442                            error: error_msg.clone(),
1443                        })
1444                        .await;
1445                }
1446
1447                publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
1448                    .await;
1449            }
1450        });
1451
1452        // Store consumer handle and update status
1453        let managed = self
1454            .routes
1455            .get_mut(route_id)
1456            .expect("invariant: route must exist after prior existence check");
1457        managed.consumer_handle = Some(consumer_handle);
1458
1459        info!(route_id = %route_id, "Route resumed");
1460        Ok(())
1461    }
1462
1463    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1464        // Only start routes where auto_startup() == true
1465        // Sort by startup_order() ascending before starting
1466        let route_ids: Vec<String> = {
1467            let mut pairs: Vec<_> = self
1468                .routes
1469                .iter()
1470                .filter(|(_, r)| r.definition.auto_startup())
1471                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1472                .collect();
1473            pairs.sort_by_key(|(_, order)| *order);
1474            pairs.into_iter().map(|(id, _)| id).collect()
1475        };
1476
1477        info!("Starting {} auto-startup routes", route_ids.len());
1478
1479        // Collect errors but continue starting remaining routes
1480        let mut errors: Vec<String> = Vec::new();
1481        for route_id in route_ids {
1482            if let Err(e) = self.start_route(&route_id).await {
1483                errors.push(format!("Route '{}': {}", route_id, e));
1484            }
1485        }
1486
1487        if !errors.is_empty() {
1488            return Err(CamelError::RouteError(format!(
1489                "Failed to start routes: {}",
1490                errors.join(", ")
1491            )));
1492        }
1493
1494        info!("All auto-startup routes started");
1495        Ok(())
1496    }
1497
1498    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1499        // Sort by startup_order descending (reverse order)
1500        let route_ids: Vec<String> = {
1501            let mut pairs: Vec<_> = self
1502                .routes
1503                .iter()
1504                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1505                .collect();
1506            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1507            pairs.into_iter().map(|(id, _)| id).collect()
1508        };
1509
1510        info!("Stopping {} routes", route_ids.len());
1511
1512        for route_id in route_ids {
1513            let _ = self.stop_route(&route_id).await;
1514        }
1515
1516        info!("All routes stopped");
1517        Ok(())
1518    }
1519}
1520
1521#[async_trait::async_trait]
1522impl RouteControllerInternal for DefaultRouteController {
1523    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1524        DefaultRouteController::add_route(self, def)
1525    }
1526
1527    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1528        DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1529    }
1530
1531    fn route_from_uri(&self, route_id: &str) -> Option<String> {
1532        // Call the inherent method which now returns Option<String>
1533        DefaultRouteController::route_from_uri(self, route_id)
1534    }
1535
1536    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1537        DefaultRouteController::set_error_handler(self, config)
1538    }
1539
1540    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1541        DefaultRouteController::set_self_ref(self, self_ref)
1542    }
1543
1544    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
1545        DefaultRouteController::set_runtime_handle(self, runtime)
1546    }
1547
1548    fn route_count(&self) -> usize {
1549        DefaultRouteController::route_count(self)
1550    }
1551
1552    fn route_ids(&self) -> Vec<String> {
1553        DefaultRouteController::route_ids(self)
1554    }
1555
1556    fn auto_startup_route_ids(&self) -> Vec<String> {
1557        DefaultRouteController::auto_startup_route_ids(self)
1558    }
1559
1560    fn shutdown_route_ids(&self) -> Vec<String> {
1561        DefaultRouteController::shutdown_route_ids(self)
1562    }
1563
1564    fn set_tracer_config(&mut self, config: &TracerConfig) {
1565        DefaultRouteController::set_tracer_config(self, config)
1566    }
1567
1568    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1569        DefaultRouteController::compile_route_definition(self, def)
1570    }
1571
1572    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1573        DefaultRouteController::remove_route(self, route_id)
1574    }
1575
1576    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1577        DefaultRouteController::start_route(self, route_id).await
1578    }
1579
1580    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1581        DefaultRouteController::stop_route(self, route_id).await
1582    }
1583}
1584
1585#[cfg(test)]
1586mod tests {
1587    use super::*;
1588
1589    #[test]
1590    fn test_route_controller_internal_is_object_safe() {
1591        let _: Option<Box<dyn RouteControllerInternal>> = None;
1592    }
1593}