Skip to main content

camel_core/
route_controller.rs

1//! Default implementation of RouteController.
2//!
3//! This module provides [`DefaultRouteController`], which manages route lifecycle
4//! including starting, stopping, suspending, and resuming routes.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{Mutex, mpsc};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tower::{Layer, Service, ServiceExt};
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
18    RouteController, RouteStatus, Value, body::Body,
19};
20use camel_component::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
21use camel_endpoint::parse_uri;
22use camel_language_api::{Expression, Language, Predicate};
23use camel_processor::circuit_breaker::CircuitBreakerLayer;
24use camel_processor::error_handler::ErrorHandlerLayer;
25use camel_processor::{ChoiceService, WhenClause};
26
27use crate::config::{DetailLevel, TracerConfig};
28use crate::registry::Registry;
29use crate::route::{
30    BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
31    compose_pipeline, compose_traced_pipeline,
32};
33use arc_swap::ArcSwap;
34use camel_bean::BeanRegistry;
35
36/// Notification sent when a route crashes.
37///
38/// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
39/// to monitor and restart failed routes.
40#[derive(Debug, Clone)]
41pub struct CrashNotification {
42    /// The ID of the crashed route.
43    pub route_id: String,
44    /// The error that caused the crash.
45    pub error: String,
46}
47
48/// Newtype to make BoxProcessor Sync-safe for ArcSwap.
49///
50/// # Safety
51///
52/// BoxProcessor (BoxCloneService) is Send but not Sync because the inner
53/// Box<dyn CloneServiceInner> lacks a Sync bound. However:
54///
55/// 1. We ONLY access BoxProcessor via clone(), which is a read-only operation
56///    (creates a new boxed service from the inner clone).
57/// 2. The clone is owned by the calling thread and never shared.
58/// 3. ArcSwap guarantees we only get & references (no &mut).
59///
60/// Therefore, concurrent access to &BoxProcessor for cloning is safe because
61/// clone() does not mutate shared state and each thread gets an independent copy.
62pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
63unsafe impl Sync for SyncBoxProcessor {}
64
65type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
66pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
67
68/// Internal trait extending [`RouteController`] with methods needed by [`CamelContext`]
69/// that are not part of the public lifecycle API.
70///
71/// Both [`DefaultRouteController`] and the future `SupervisingRouteController` implement
72/// this trait, allowing `CamelContext` to hold either as `Arc<Mutex<dyn RouteControllerInternal>>`.
73#[async_trait::async_trait]
74pub trait RouteControllerInternal: RouteController + Send {
75    /// Add a route definition to the controller.
76    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
77
78    /// Atomically swap the pipeline of a running route (for hot-reload).
79    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
80
81    /// Returns the `from_uri` of a route by ID.
82    fn route_from_uri(&self, route_id: &str) -> Option<String>;
83
84    /// Set a global error handler applied to all routes.
85    fn set_error_handler(&mut self, config: ErrorHandlerConfig);
86
87    /// Set the self-reference needed to create `ProducerContext`.
88    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
89
90    /// Returns the number of routes in the controller.
91    fn route_count(&self) -> usize;
92
93    /// Returns all route IDs.
94    fn route_ids(&self) -> Vec<String>;
95
96    /// Configure tracing from a [`TracerConfig`].
97    fn set_tracer_config(&mut self, config: &TracerConfig);
98
99    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting it into the route map.
100    /// Used by hot-reload to prepare a new pipeline for atomic swap.
101    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
102
103    /// Remove a route from the controller map (route must be stopped first).
104    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
105
106    /// Start a route by ID (for use by hot-reload, where async_trait is required).
107    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
108
109    /// Stop a route by ID (for use by hot-reload, where async_trait is required).
110    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
111}
112
113/// Internal state for a managed route.
114struct ManagedRoute {
115    /// The route definition metadata (for introspection).
116    definition: RouteDefinitionInfo,
117    /// Source endpoint URI.
118    from_uri: String,
119    /// Resolved processor pipeline (wrapped for atomic swap).
120    pipeline: SharedPipeline,
121    /// Concurrency model override (if any).
122    concurrency: Option<ConcurrencyModel>,
123    /// Shared lifecycle status — written by both the controller and spawned consumer tasks.
124    status: Arc<std::sync::Mutex<RouteStatus>>,
125    /// Handle for the consumer task (if running).
126    consumer_handle: Option<JoinHandle<()>>,
127    /// Handle for the pipeline task (if running).
128    pipeline_handle: Option<JoinHandle<()>>,
129    /// Cancellation token for stopping the consumer task.
130    /// This allows independent control of the consumer lifecycle (for suspend/resume).
131    consumer_cancel_token: CancellationToken,
132    /// Cancellation token for stopping the pipeline task.
133    /// This allows independent control of the pipeline lifecycle (for suspend/resume).
134    pipeline_cancel_token: CancellationToken,
135    /// Channel sender for sending exchanges to the pipeline.
136    /// Stored to allow resuming a suspended route without recreating the channel.
137    channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
138}
139
140/// Wait for a pipeline service to be ready with circuit breaker backoff.
141///
142/// This helper encapsulates the pattern of repeatedly calling `ready()` on a
143/// service while handling `CircuitOpen` errors with a fixed 1-second backoff and
144/// cancellation checks. It returns `Ok(())` when the service is ready, or
145/// `Err(e)` if cancellation occurred or a fatal error was encountered.
146async fn ready_with_backoff(
147    pipeline: &mut BoxProcessor,
148    cancel: &CancellationToken,
149) -> Result<(), CamelError> {
150    loop {
151        match pipeline.ready().await {
152            Ok(_) => return Ok(()),
153            Err(CamelError::CircuitOpen(ref msg)) => {
154                warn!("Circuit open, backing off: {msg}");
155                tokio::select! {
156                    _ = tokio::time::sleep(Duration::from_secs(1)) => {
157                        continue;
158                    }
159                    _ = cancel.cancelled() => {
160                        // Shutting down — don't retry.
161                        return Err(CamelError::CircuitOpen(msg.clone()));
162                    }
163                }
164            }
165            Err(e) => {
166                error!("Pipeline not ready: {e}");
167                return Err(e);
168            }
169        }
170    }
171}
172
173/// Default implementation of [`RouteController`].
174///
175/// Manages route lifecycle with support for:
176/// - Starting/stopping individual routes
177/// - Suspending and resuming routes
178/// - Auto-startup with startup ordering
179/// - Graceful shutdown
180pub struct DefaultRouteController {
181    /// Routes indexed by route ID.
182    routes: HashMap<String, ManagedRoute>,
183    /// Reference to the component registry for resolving endpoints.
184    registry: Arc<std::sync::Mutex<Registry>>,
185    /// Shared language registry for resolving declarative language expressions.
186    languages: SharedLanguageRegistry,
187    /// Bean registry for bean method invocation.
188    beans: Arc<std::sync::Mutex<BeanRegistry>>,
189    /// Self-reference for creating ProducerContext.
190    /// Set after construction via `set_self_ref()`.
191    self_ref: Option<Arc<Mutex<dyn RouteController>>>,
192    /// Optional global error handler applied to all routes without a per-route handler.
193    global_error_handler: Option<ErrorHandlerConfig>,
194    /// Optional crash notifier for supervision.
195    crash_notifier: Option<mpsc::Sender<CrashNotification>>,
196    /// Whether tracing is enabled for route pipelines.
197    tracing_enabled: bool,
198    /// Detail level for tracing when enabled.
199    tracer_detail_level: DetailLevel,
200}
201
202impl DefaultRouteController {
203    /// Create a new `DefaultRouteController` with the given registry.
204    pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
205        Self::with_beans(
206            registry,
207            Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
208        )
209    }
210
211    /// Create a new `DefaultRouteController` with shared bean registry.
212    pub fn with_beans(
213        registry: Arc<std::sync::Mutex<Registry>>,
214        beans: Arc<std::sync::Mutex<BeanRegistry>>,
215    ) -> Self {
216        Self {
217            routes: HashMap::new(),
218            registry,
219            languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
220            beans,
221            self_ref: None,
222            global_error_handler: None,
223            crash_notifier: None,
224            tracing_enabled: false,
225            tracer_detail_level: DetailLevel::Minimal,
226        }
227    }
228
229    /// Create a new `DefaultRouteController` with shared language registry.
230    pub fn with_languages(
231        registry: Arc<std::sync::Mutex<Registry>>,
232        languages: SharedLanguageRegistry,
233    ) -> Self {
234        Self {
235            routes: HashMap::new(),
236            registry,
237            languages,
238            beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
239            self_ref: None,
240            global_error_handler: None,
241            crash_notifier: None,
242            tracing_enabled: false,
243            tracer_detail_level: DetailLevel::Minimal,
244        }
245    }
246
247    /// Set the self-reference for creating ProducerContext.
248    ///
249    /// This must be called after wrapping the controller in `Arc<Mutex<>>`.
250    pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
251        self.self_ref = Some(self_ref);
252    }
253
254    /// Get the self-reference, if set.
255    ///
256    /// Used by [`SupervisingRouteController`](crate::supervising_route_controller::SupervisingRouteController)
257    /// to spawn the supervision loop.
258    pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
259        self.self_ref.clone()
260    }
261
262    /// Set the crash notifier for supervision.
263    ///
264    /// When set, the controller will send a [`CrashNotification`] whenever
265    /// a consumer crashes.
266    pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
267        self.crash_notifier = Some(tx);
268    }
269
270    /// Set a global error handler applied to all routes without a per-route handler.
271    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
272        self.global_error_handler = Some(config);
273    }
274
275    /// Configure tracing for this route controller.
276    pub fn set_tracer_config(&mut self, config: &TracerConfig) {
277        self.tracing_enabled = config.enabled;
278        self.tracer_detail_level = config.detail_level.clone();
279    }
280
281    /// Resolve an `ErrorHandlerConfig` into an `ErrorHandlerLayer`.
282    fn resolve_error_handler(
283        &self,
284        config: ErrorHandlerConfig,
285        producer_ctx: &ProducerContext,
286        registry: &Registry,
287    ) -> Result<ErrorHandlerLayer, CamelError> {
288        // Resolve DLC URI → producer.
289        let dlc_producer = if let Some(ref uri) = config.dlc_uri {
290            let parsed = parse_uri(uri)?;
291            let component = registry.get_or_err(&parsed.scheme)?;
292            let endpoint = component.create_endpoint(uri)?;
293            Some(endpoint.create_producer(producer_ctx)?)
294        } else {
295            None
296        };
297
298        // Resolve per-policy `handled_by` URIs.
299        let mut resolved_policies = Vec::new();
300        for policy in config.policies {
301            let handler_producer = if let Some(ref uri) = policy.handled_by {
302                let parsed = parse_uri(uri)?;
303                let component = registry.get_or_err(&parsed.scheme)?;
304                let endpoint = component.create_endpoint(uri)?;
305                Some(endpoint.create_producer(producer_ctx)?)
306            } else {
307                None
308            };
309            resolved_policies.push((policy, handler_producer));
310        }
311
312        Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
313    }
314
315    fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
316        let guard = self
317            .languages
318            .lock()
319            .expect("mutex poisoned: another thread panicked while holding this lock");
320        guard.get(language).cloned().ok_or_else(|| {
321            CamelError::RouteError(format!(
322                "language `{language}` is not registered in CamelContext"
323            ))
324        })
325    }
326
327    fn compile_language_expression(
328        &self,
329        expression: &LanguageExpressionDef,
330    ) -> Result<Arc<dyn Expression>, CamelError> {
331        let language = self.resolve_language(&expression.language)?;
332        let compiled = language
333            .create_expression(&expression.source)
334            .map_err(|e| {
335                CamelError::RouteError(format!(
336                    "failed to compile {} expression `{}`: {e}",
337                    expression.language, expression.source
338                ))
339            })?;
340        Ok(Arc::from(compiled))
341    }
342
343    fn compile_language_predicate(
344        &self,
345        expression: &LanguageExpressionDef,
346    ) -> Result<Arc<dyn Predicate>, CamelError> {
347        let language = self.resolve_language(&expression.language)?;
348        let compiled = language.create_predicate(&expression.source).map_err(|e| {
349            CamelError::RouteError(format!(
350                "failed to compile {} predicate `{}`: {e}",
351                expression.language, expression.source
352            ))
353        })?;
354        Ok(Arc::from(compiled))
355    }
356
357    fn compile_filter_predicate(
358        &self,
359        expression: &LanguageExpressionDef,
360    ) -> Result<FilterPredicate, CamelError> {
361        let predicate = self.compile_language_predicate(expression)?;
362        Ok(Arc::new(move |exchange: &Exchange| {
363            predicate.matches(exchange).unwrap_or(false)
364        }))
365    }
366
367    fn value_to_body(value: Value) -> Body {
368        match value {
369            Value::Null => Body::Empty,
370            Value::String(text) => Body::Text(text),
371            other => Body::Json(other),
372        }
373    }
374
375    /// Resolve BuilderSteps into BoxProcessors.
376    pub(crate) fn resolve_steps(
377        &self,
378        steps: Vec<BuilderStep>,
379        producer_ctx: &ProducerContext,
380        registry: &Registry,
381    ) -> Result<Vec<BoxProcessor>, CamelError> {
382        let mut processors: Vec<BoxProcessor> = Vec::new();
383        for step in steps {
384            match step {
385                BuilderStep::Processor(svc) => {
386                    processors.push(svc);
387                }
388                BuilderStep::To(uri) => {
389                    let parsed = parse_uri(&uri)?;
390                    let component = registry.get_or_err(&parsed.scheme)?;
391                    let endpoint = component.create_endpoint(&uri)?;
392                    let producer = endpoint.create_producer(producer_ctx)?;
393                    processors.push(producer);
394                }
395                BuilderStep::DeclarativeSetHeader { key, value } => match value {
396                    ValueSourceDef::Literal(value) => {
397                        let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
398                        processors.push(BoxProcessor::new(svc));
399                    }
400                    ValueSourceDef::Expression(expression) => {
401                        let expression = self.compile_language_expression(&expression)?;
402                        let svc = camel_processor::DynamicSetHeader::new(
403                            IdentityProcessor,
404                            key,
405                            move |exchange: &Exchange| {
406                                expression.evaluate(exchange).unwrap_or(Value::Null)
407                            },
408                        );
409                        processors.push(BoxProcessor::new(svc));
410                    }
411                },
412                BuilderStep::DeclarativeSetBody { value } => match value {
413                    ValueSourceDef::Literal(value) => {
414                        let body = Self::value_to_body(value);
415                        let svc = camel_processor::SetBody::new(
416                            IdentityProcessor,
417                            move |_exchange: &Exchange| body.clone(),
418                        );
419                        processors.push(BoxProcessor::new(svc));
420                    }
421                    ValueSourceDef::Expression(expression) => {
422                        let expression = self.compile_language_expression(&expression)?;
423                        let svc = camel_processor::SetBody::new(
424                            IdentityProcessor,
425                            move |exchange: &Exchange| {
426                                let value = expression.evaluate(exchange).unwrap_or(Value::Null);
427                                Self::value_to_body(value)
428                            },
429                        );
430                        processors.push(BoxProcessor::new(svc));
431                    }
432                },
433                BuilderStep::DeclarativeFilter { predicate, steps } => {
434                    let predicate = self.compile_filter_predicate(&predicate)?;
435                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
436                    let sub_pipeline = compose_pipeline(sub_processors);
437                    let svc =
438                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
439                    processors.push(BoxProcessor::new(svc));
440                }
441                BuilderStep::DeclarativeChoice { whens, otherwise } => {
442                    let mut when_clauses = Vec::new();
443                    for when_step in whens {
444                        let predicate = self.compile_filter_predicate(&when_step.predicate)?;
445                        let sub_processors =
446                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
447                        let pipeline = compose_pipeline(sub_processors);
448                        when_clauses.push(WhenClause {
449                            predicate,
450                            pipeline,
451                        });
452                    }
453                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
454                        let sub_processors =
455                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
456                        Some(compose_pipeline(sub_processors))
457                    } else {
458                        None
459                    };
460                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
461                    processors.push(BoxProcessor::new(svc));
462                }
463                BuilderStep::DeclarativeScript { expression } => {
464                    let expression = self.compile_language_expression(&expression)?;
465                    let svc = camel_processor::SetBody::new(
466                        IdentityProcessor,
467                        move |exchange: &Exchange| {
468                            let value = expression.evaluate(exchange).unwrap_or(Value::Null);
469                            Self::value_to_body(value)
470                        },
471                    );
472                    processors.push(BoxProcessor::new(svc));
473                }
474                BuilderStep::Split { config, steps } => {
475                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
476                    let sub_pipeline = compose_pipeline(sub_processors);
477                    let splitter =
478                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
479                    processors.push(BoxProcessor::new(splitter));
480                }
481                BuilderStep::DeclarativeSplit {
482                    expression,
483                    aggregation,
484                    parallel,
485                    parallel_limit,
486                    stop_on_exception,
487                    steps,
488                } => {
489                    let lang_expr = self.compile_language_expression(&expression)?;
490                    let split_fn = move |exchange: &Exchange| {
491                        let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
492                        match value {
493                            Value::String(s) => s
494                                .lines()
495                                .filter(|line| !line.is_empty())
496                                .map(|line| {
497                                    let mut fragment = exchange.clone();
498                                    fragment.input.body = Body::from(line.to_string());
499                                    fragment
500                                })
501                                .collect(),
502                            Value::Array(arr) => arr
503                                .into_iter()
504                                .map(|v| {
505                                    let mut fragment = exchange.clone();
506                                    fragment.input.body = Body::from(v);
507                                    fragment
508                                })
509                                .collect(),
510                            _ => vec![exchange.clone()],
511                        }
512                    };
513
514                    let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
515                        .aggregation(aggregation)
516                        .parallel(parallel)
517                        .stop_on_exception(stop_on_exception);
518                    if let Some(limit) = parallel_limit {
519                        config = config.parallel_limit(limit);
520                    }
521
522                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
523                    let sub_pipeline = compose_pipeline(sub_processors);
524                    let splitter =
525                        camel_processor::splitter::SplitterService::new(config, sub_pipeline);
526                    processors.push(BoxProcessor::new(splitter));
527                }
528                BuilderStep::Aggregate { config } => {
529                    let svc = camel_processor::AggregatorService::new(config);
530                    processors.push(BoxProcessor::new(svc));
531                }
532                BuilderStep::Filter { predicate, steps } => {
533                    let sub_processors = self.resolve_steps(steps, producer_ctx, registry)?;
534                    let sub_pipeline = compose_pipeline(sub_processors);
535                    let svc =
536                        camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
537                    processors.push(BoxProcessor::new(svc));
538                }
539                BuilderStep::Choice { whens, otherwise } => {
540                    // Resolve each when clause's sub-steps into a pipeline.
541                    let mut when_clauses = Vec::new();
542                    for when_step in whens {
543                        let sub_processors =
544                            self.resolve_steps(when_step.steps, producer_ctx, registry)?;
545                        let pipeline = compose_pipeline(sub_processors);
546                        when_clauses.push(WhenClause {
547                            predicate: when_step.predicate,
548                            pipeline,
549                        });
550                    }
551                    // Resolve otherwise branch (if present).
552                    let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
553                        let sub_processors =
554                            self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
555                        Some(compose_pipeline(sub_processors))
556                    } else {
557                        None
558                    };
559                    let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
560                    processors.push(BoxProcessor::new(svc));
561                }
562                BuilderStep::WireTap { uri } => {
563                    let parsed = parse_uri(&uri)?;
564                    let component = registry.get_or_err(&parsed.scheme)?;
565                    let endpoint = component.create_endpoint(&uri)?;
566                    let producer = endpoint.create_producer(producer_ctx)?;
567                    let svc = camel_processor::WireTapService::new(producer);
568                    processors.push(BoxProcessor::new(svc));
569                }
570                BuilderStep::Multicast { config, steps } => {
571                    // Each top-level step in the multicast scope becomes an independent endpoint.
572                    let mut endpoints = Vec::new();
573                    for step in steps {
574                        let sub_processors =
575                            self.resolve_steps(vec![step], producer_ctx, registry)?;
576                        let endpoint = compose_pipeline(sub_processors);
577                        endpoints.push(endpoint);
578                    }
579                    let svc = camel_processor::MulticastService::new(endpoints, config);
580                    processors.push(BoxProcessor::new(svc));
581                }
582                BuilderStep::DeclarativeLog { level, message } => {
583                    let ValueSourceDef::Expression(expression) = message else {
584                        // Literal case is already converted to a Processor in compile.rs;
585                        // this arm should never be reached for literals.
586                        unreachable!(
587                            "DeclarativeLog with Literal should have been compiled to a Processor"
588                        );
589                    };
590                    let expression = self.compile_language_expression(&expression)?;
591                    let svc =
592                        camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
593                            expression
594                                .evaluate(exchange)
595                                .unwrap_or_else(|e| {
596                                    warn!(error = %e, "log expression evaluation failed");
597                                    Value::Null
598                                })
599                                .to_string()
600                        });
601                    processors.push(BoxProcessor::new(svc));
602                }
603                BuilderStep::Bean { name, method } => {
604                    // Lock beans registry to lookup bean
605                    let beans = self.beans.lock().expect(
606                        "beans mutex poisoned: another thread panicked while holding this lock",
607                    );
608
609                    // Lookup bean by name
610                    let bean = beans.get(&name).ok_or_else(|| {
611                        CamelError::ProcessorError(format!("Bean not found: {}", name))
612                    })?;
613
614                    // Clone Arc for async closure (release lock before async)
615                    let bean_clone = Arc::clone(&bean);
616                    let method = method.clone();
617
618                    // Create processor that invokes bean method
619                    let processor = tower::service_fn(move |mut exchange: Exchange| {
620                        let bean = Arc::clone(&bean_clone);
621                        let method = method.clone();
622
623                        async move {
624                            bean.call(&method, &mut exchange).await?;
625                            Ok(exchange)
626                        }
627                    });
628
629                    processors.push(BoxProcessor::new(processor));
630                }
631            }
632        }
633        Ok(processors)
634    }
635
636    /// Add a route definition to the controller.
637    ///
638    /// Steps are resolved immediately using the registry.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if:
643    /// - A route with the same ID already exists
644    /// - Step resolution fails
645    pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
646        let route_id = definition.route_id().to_string();
647
648        if self.routes.contains_key(&route_id) {
649            return Err(CamelError::RouteError(format!(
650                "Route '{}' already exists",
651                route_id
652            )));
653        }
654
655        info!(route_id = %route_id, "Adding route to controller");
656
657        // Extract definition info for storage before steps are consumed
658        let definition_info = definition.to_info();
659        let from_uri = definition.from_uri.to_string();
660        let concurrency = definition.concurrency;
661
662        // Create ProducerContext from self_ref for step resolution
663        let producer_ctx = self
664            .self_ref
665            .clone()
666            .map(ProducerContext::new)
667            .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
668
669        // Lock registry for step resolution
670        let registry = self
671            .registry
672            .lock()
673            .expect("mutex poisoned: another thread panicked while holding this lock");
674
675        // Resolve steps into processors (takes ownership of steps)
676        let processors = self.resolve_steps(definition.steps, &producer_ctx, &registry)?;
677        let route_id_for_tracing = route_id.clone();
678        let mut pipeline = compose_traced_pipeline(
679            processors,
680            &route_id_for_tracing,
681            self.tracing_enabled,
682            self.tracer_detail_level.clone(),
683        );
684
685        // Apply circuit breaker if configured
686        if let Some(cb_config) = definition.circuit_breaker {
687            let cb_layer = CircuitBreakerLayer::new(cb_config);
688            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
689        }
690
691        // Determine which error handler config to use (per-route takes precedence)
692        let eh_config = definition
693            .error_handler
694            .or_else(|| self.global_error_handler.clone());
695
696        if let Some(config) = eh_config {
697            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
698            pipeline = BoxProcessor::new(layer.layer(pipeline));
699        }
700
701        // Drop the lock before modifying self.routes
702        drop(registry);
703
704        self.routes.insert(
705            route_id.clone(),
706            ManagedRoute {
707                definition: definition_info,
708                from_uri,
709                pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
710                concurrency,
711                status: Arc::new(std::sync::Mutex::new(RouteStatus::Stopped)),
712                consumer_handle: None,
713                pipeline_handle: None,
714                consumer_cancel_token: CancellationToken::new(),
715                pipeline_cancel_token: CancellationToken::new(),
716                channel_sender: None,
717            },
718        );
719
720        Ok(())
721    }
722
723    /// Compile a `RouteDefinition` into a `BoxProcessor` without inserting into the route map.
724    ///
725    /// Used by hot-reload to prepare a new pipeline for atomic swap without disrupting
726    /// the running route. The caller is responsible for swapping via `swap_pipeline`.
727    pub fn compile_route_definition(
728        &self,
729        def: RouteDefinition,
730    ) -> Result<BoxProcessor, CamelError> {
731        let route_id = def.route_id().to_string();
732
733        let producer_ctx = self
734            .self_ref
735            .clone()
736            .map(ProducerContext::new)
737            .ok_or_else(|| CamelError::RouteError("RouteController self_ref not set".into()))?;
738
739        let registry = self
740            .registry
741            .lock()
742            .expect("mutex poisoned: registry lock in compile_route_definition");
743
744        let processors = self.resolve_steps(def.steps, &producer_ctx, &registry)?;
745        let mut pipeline = compose_traced_pipeline(
746            processors,
747            &route_id,
748            self.tracing_enabled,
749            self.tracer_detail_level.clone(),
750        );
751
752        if let Some(cb_config) = def.circuit_breaker {
753            let cb_layer = CircuitBreakerLayer::new(cb_config);
754            pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
755        }
756
757        let eh_config = def
758            .error_handler
759            .or_else(|| self.global_error_handler.clone());
760        if let Some(config) = eh_config {
761            let layer = self.resolve_error_handler(config, &producer_ctx, &registry)?;
762            pipeline = BoxProcessor::new(layer.layer(pipeline));
763        }
764
765        drop(registry);
766        Ok(pipeline)
767    }
768
769    /// Remove a route from the controller map.
770    ///
771    /// The route **must** be stopped before removal (status `Stopped` or `Failed`).
772    /// Returns an error if the route is still running or does not exist.
773    /// Does not cancel any running tasks — call `stop_route` first.
774    pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
775        let managed = self.routes.get(route_id).ok_or_else(|| {
776            CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
777        })?;
778        let status = managed
779            .status
780            .lock()
781            .expect("status mutex poisoned")
782            .clone();
783        match status {
784            RouteStatus::Stopped | RouteStatus::Failed(_) => {}
785            other => {
786                return Err(CamelError::RouteError(format!(
787                    "Route '{}' must be stopped before removal (current status: {:?})",
788                    route_id, other
789                )));
790            }
791        }
792        self.routes.remove(route_id);
793        info!(route_id = %route_id, "Route removed from controller");
794        Ok(())
795    }
796
797    /// Returns the number of routes in the controller.
798    pub fn route_count(&self) -> usize {
799        self.routes.len()
800    }
801
802    /// Force-set the status of a route. Only for use in tests.
803    #[doc(hidden)]
804    pub fn force_route_status(&mut self, route_id: &str, status: RouteStatus) {
805        if let Some(managed) = self.routes.get(route_id) {
806            *managed.status.lock().expect("status mutex poisoned") = status;
807        }
808    }
809
810    /// Returns all route IDs.
811    pub fn route_ids(&self) -> Vec<String> {
812        self.routes.keys().cloned().collect()
813    }
814
815    /// Atomically swap the pipeline of a route.
816    ///
817    /// In-flight requests finish with the old pipeline (kept alive by Arc).
818    /// New requests immediately use the new pipeline.
819    pub fn swap_pipeline(
820        &self,
821        route_id: &str,
822        new_pipeline: BoxProcessor,
823    ) -> Result<(), CamelError> {
824        let managed = self
825            .routes
826            .get(route_id)
827            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
828
829        managed
830            .pipeline
831            .store(Arc::new(SyncBoxProcessor(new_pipeline)));
832        info!(route_id = %route_id, "Pipeline swapped atomically");
833        Ok(())
834    }
835
836    /// Returns the from_uri of a route, if it exists.
837    pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
838        self.routes.get(route_id).map(|r| r.from_uri.clone())
839    }
840
841    /// Get a clone of the current pipeline for a route.
842    ///
843    /// This is useful for testing and introspection.
844    /// Returns `None` if the route doesn't exist.
845    pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
846        self.routes
847            .get(route_id)
848            .map(|r| r.pipeline.load().0.clone())
849    }
850
851    /// Internal stop implementation that can set custom status.
852    async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
853        let managed = self
854            .routes
855            .get_mut(route_id)
856            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
857
858        let current_status = managed
859            .status
860            .lock()
861            .expect("status mutex poisoned")
862            .clone();
863        if current_status != RouteStatus::Started && current_status != RouteStatus::Suspended {
864            return Ok(()); // Already stopped or stopping
865        }
866
867        info!(route_id = %route_id, "Stopping route");
868        *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopping;
869
870        // Cancel both tokens to signal shutdown for consumer and pipeline independently
871        let managed = self
872            .routes
873            .get_mut(route_id)
874            .expect("invariant: route must exist after prior existence check");
875        managed.consumer_cancel_token.cancel();
876        managed.pipeline_cancel_token.cancel();
877
878        // Take handles directly (no Arc<Mutex> wrapper needed)
879        let managed = self
880            .routes
881            .get_mut(route_id)
882            .expect("invariant: route must exist after prior existence check");
883        let consumer_handle = managed.consumer_handle.take();
884        let pipeline_handle = managed.pipeline_handle.take();
885
886        // IMPORTANT: Drop channel_sender early so rx.recv() returns None
887        // This ensures the pipeline task can exit even if idle on recv()
888        let managed = self
889            .routes
890            .get_mut(route_id)
891            .expect("invariant: route must exist after prior existence check");
892        managed.channel_sender = None;
893
894        // Wait for tasks to complete with timeout
895        // The CancellationToken already signaled tasks to stop gracefully.
896        // Combined with the select! in pipeline loops, this should exit quickly.
897        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
898            match (consumer_handle, pipeline_handle) {
899                (Some(c), Some(p)) => {
900                    let _ = tokio::join!(c, p);
901                }
902                (Some(c), None) => {
903                    let _ = c.await;
904                }
905                (None, Some(p)) => {
906                    let _ = p.await;
907                }
908                (None, None) => {}
909            }
910        })
911        .await;
912
913        if timeout_result.is_err() {
914            warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
915        }
916
917        // Get the managed route again (can't hold across await)
918        let managed = self
919            .routes
920            .get_mut(route_id)
921            .expect("invariant: route must exist after prior existence check");
922
923        // Create fresh cancellation tokens for next start
924        managed.consumer_cancel_token = CancellationToken::new();
925        managed.pipeline_cancel_token = CancellationToken::new();
926        *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopped;
927
928        info!(route_id = %route_id, "Route stopped");
929        Ok(())
930    }
931}
932
933#[async_trait::async_trait]
934impl RouteController for DefaultRouteController {
935    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
936        // Check if route exists and can be started, and update status atomically
937        {
938            let managed = self
939                .routes
940                .get_mut(route_id)
941                .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
942
943            let current_status = managed
944                .status
945                .lock()
946                .expect("status mutex poisoned")
947                .clone();
948            match current_status {
949                RouteStatus::Started => return Ok(()), // Already running
950                RouteStatus::Starting => {
951                    return Err(CamelError::RouteError(format!(
952                        "Route '{}' is already starting",
953                        route_id
954                    )));
955                }
956                RouteStatus::Stopped | RouteStatus::Failed(_) => {} // OK to start
957                RouteStatus::Stopping => {
958                    return Err(CamelError::RouteError(format!(
959                        "Route '{}' is stopping",
960                        route_id
961                    )));
962                }
963                RouteStatus::Suspended => {
964                    return Err(CamelError::RouteError(format!(
965                        "Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
966                        route_id
967                    )));
968                }
969            }
970            *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Starting;
971        }
972
973        info!(route_id = %route_id, "Starting route");
974
975        // Get the resolved route info
976        let (from_uri, pipeline, concurrency, status_for_consumer) = {
977            let managed = self
978                .routes
979                .get(route_id)
980                .expect("invariant: route must exist after prior existence check");
981            (
982                managed.from_uri.clone(),
983                Arc::clone(&managed.pipeline),
984                managed.concurrency.clone(),
985                Arc::clone(&managed.status),
986            )
987        };
988
989        // Clone crash notifier for consumer task
990        let crash_notifier = self.crash_notifier.clone();
991
992        // Parse from URI and create consumer (lock registry for lookup)
993        let parsed = parse_uri(&from_uri)?;
994        let registry = self
995            .registry
996            .lock()
997            .expect("mutex poisoned: another thread panicked while holding this lock");
998        let component = registry.get_or_err(&parsed.scheme)?;
999        let endpoint = component.create_endpoint(&from_uri)?;
1000        let mut consumer = endpoint.create_consumer()?;
1001        let consumer_concurrency = consumer.concurrency_model();
1002        // Drop the lock before spawning tasks
1003        drop(registry);
1004
1005        // Resolve effective concurrency: route override > consumer default
1006        let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
1007
1008        // Get the managed route for mutation
1009        let managed = self
1010            .routes
1011            .get_mut(route_id)
1012            .expect("invariant: route must exist after prior existence check");
1013
1014        // Create channel for consumer to send exchanges
1015        let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
1016        // Create child tokens for independent lifecycle control
1017        let consumer_cancel = managed.consumer_cancel_token.child_token();
1018        let pipeline_cancel = managed.pipeline_cancel_token.child_token();
1019        // Clone sender for storage (to reuse on resume)
1020        let tx_for_storage = tx.clone();
1021        let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
1022
1023        // Start consumer in background task.
1024        // Status is shared via Arc<Mutex<>> so the task can update it on crash.
1025        let route_id_for_consumer = route_id.to_string();
1026        let consumer_handle = tokio::spawn(async move {
1027            if let Err(e) = consumer.start(consumer_ctx).await {
1028                error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
1029                let error_msg = e.to_string();
1030                *status_for_consumer.lock().expect("status mutex poisoned") =
1031                    RouteStatus::Failed(error_msg.clone());
1032
1033                // Send crash notification if notifier is configured
1034                if let Some(tx) = crash_notifier {
1035                    let _ = tx
1036                        .send(CrashNotification {
1037                            route_id: route_id_for_consumer.clone(),
1038                            error: error_msg,
1039                        })
1040                        .await;
1041                }
1042            }
1043        });
1044
1045        // Spawn pipeline task with its own cancellation token
1046        let pipeline_handle = match effective_concurrency {
1047            ConcurrencyModel::Sequential => {
1048                tokio::spawn(async move {
1049                    loop {
1050                        // Use select! to exit promptly on cancellation even when idle
1051                        let envelope = tokio::select! {
1052                            envelope = rx.recv() => match envelope {
1053                                Some(e) => e,
1054                                None => return, // Channel closed
1055                            },
1056                            _ = pipeline_cancel.cancelled() => {
1057                                // Cancellation requested - exit gracefully
1058                                return;
1059                            }
1060                        };
1061                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1062
1063                        // Load current pipeline from ArcSwap (picks up hot-reloaded pipelines)
1064                        let mut pipeline = pipeline.load().0.clone();
1065
1066                        if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
1067                            if let Some(tx) = reply_tx {
1068                                let _ = tx.send(Err(e));
1069                            }
1070                            return;
1071                        }
1072
1073                        let result = pipeline.call(exchange).await;
1074                        if let Some(tx) = reply_tx {
1075                            let _ = tx.send(result);
1076                        } else if let Err(ref e) = result
1077                            && !matches!(e, CamelError::Stopped)
1078                        {
1079                            error!("Pipeline error: {e}");
1080                        }
1081                    }
1082                })
1083            }
1084            ConcurrencyModel::Concurrent { max } => {
1085                let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1086                tokio::spawn(async move {
1087                    loop {
1088                        // Use select! to exit promptly on cancellation even when idle
1089                        let envelope = tokio::select! {
1090                            envelope = rx.recv() => match envelope {
1091                                Some(e) => e,
1092                                None => return, // Channel closed
1093                            },
1094                            _ = pipeline_cancel.cancelled() => {
1095                                // Cancellation requested - exit gracefully
1096                                return;
1097                            }
1098                        };
1099                        let ExchangeEnvelope { exchange, reply_tx } = envelope;
1100                        let pipe_ref = Arc::clone(&pipeline);
1101                        let sem = sem.clone();
1102                        let cancel = pipeline_cancel.clone();
1103                        tokio::spawn(async move {
1104                            // Acquire semaphore permit if bounded
1105                            let _permit = match &sem {
1106                                Some(s) => Some(s.acquire().await.expect("semaphore closed")),
1107                                None => None,
1108                            };
1109
1110                            // Load current pipeline from ArcSwap
1111                            let mut pipe = pipe_ref.load().0.clone();
1112
1113                            // Wait for service ready with circuit breaker backoff
1114                            if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
1115                                if let Some(tx) = reply_tx {
1116                                    let _ = tx.send(Err(e));
1117                                }
1118                                return;
1119                            }
1120
1121                            let result = pipe.call(exchange).await;
1122                            if let Some(tx) = reply_tx {
1123                                let _ = tx.send(result);
1124                            } else if let Err(ref e) = result
1125                                && !matches!(e, CamelError::Stopped)
1126                            {
1127                                error!("Pipeline error: {e}");
1128                            }
1129                        });
1130                    }
1131                })
1132            }
1133        };
1134
1135        // Store handles and update status
1136        let managed = self
1137            .routes
1138            .get_mut(route_id)
1139            .expect("invariant: route must exist after prior existence check");
1140        managed.consumer_handle = Some(consumer_handle);
1141        managed.pipeline_handle = Some(pipeline_handle);
1142        managed.channel_sender = Some(tx_for_storage);
1143        // Only mark as Started if consumer hasn't already crashed
1144        let mut status_guard = managed.status.lock().expect("status mutex poisoned");
1145        if !matches!(*status_guard, RouteStatus::Failed(_)) {
1146            *status_guard = RouteStatus::Started;
1147        }
1148        drop(status_guard);
1149
1150        info!(route_id = %route_id, "Route started");
1151        Ok(())
1152    }
1153
1154    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1155        self.stop_route_internal(route_id).await
1156    }
1157
1158    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1159        self.stop_route(route_id).await?;
1160        tokio::time::sleep(Duration::from_millis(100)).await;
1161        self.start_route(route_id).await
1162    }
1163
1164    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1165        // Check route exists and get current status
1166        let managed = self
1167            .routes
1168            .get_mut(route_id)
1169            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1170
1171        let current_status = managed
1172            .status
1173            .lock()
1174            .expect("status mutex poisoned")
1175            .clone();
1176
1177        // Can only suspend from Started state
1178        if current_status != RouteStatus::Started {
1179            return Err(CamelError::RouteError(format!(
1180                "Cannot suspend route '{}' with status {:?}",
1181                route_id, current_status
1182            )));
1183        }
1184
1185        info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
1186
1187        // Set status to Stopping during suspend
1188        *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Stopping;
1189
1190        // Cancel consumer token only (keep pipeline running)
1191        let managed = self
1192            .routes
1193            .get_mut(route_id)
1194            .expect("invariant: route must exist after prior existence check");
1195        managed.consumer_cancel_token.cancel();
1196
1197        // Take and join consumer handle
1198        let managed = self
1199            .routes
1200            .get_mut(route_id)
1201            .expect("invariant: route must exist after prior existence check");
1202        let consumer_handle = managed.consumer_handle.take();
1203
1204        // Wait for consumer task to complete with timeout
1205        let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
1206            if let Some(handle) = consumer_handle {
1207                let _ = handle.await;
1208            }
1209        })
1210        .await;
1211
1212        if timeout_result.is_err() {
1213            warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
1214        }
1215
1216        // Get the managed route again (can't hold across await)
1217        let managed = self
1218            .routes
1219            .get_mut(route_id)
1220            .expect("invariant: route must exist after prior existence check");
1221
1222        // Create fresh cancellation token for consumer (for resume)
1223        managed.consumer_cancel_token = CancellationToken::new();
1224
1225        // Set status to Suspended (pipeline is still running)
1226        *managed.status.lock().expect("status mutex poisoned") = RouteStatus::Suspended;
1227
1228        info!(route_id = %route_id, "Route suspended (pipeline still running)");
1229        Ok(())
1230    }
1231
1232    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1233        // Check route exists and is Suspended
1234        let managed = self
1235            .routes
1236            .get(route_id)
1237            .ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
1238
1239        let current_status = managed
1240            .status
1241            .lock()
1242            .expect("status mutex poisoned")
1243            .clone();
1244
1245        if current_status != RouteStatus::Suspended {
1246            return Err(CamelError::RouteError(format!(
1247                "Cannot resume route '{}' with status {:?} (expected Suspended)",
1248                route_id, current_status
1249            )));
1250        }
1251
1252        // Get the stored channel sender (must exist for a suspended route)
1253        let sender = managed.channel_sender.clone().ok_or_else(|| {
1254            CamelError::RouteError("Suspended route has no channel sender".into())
1255        })?;
1256
1257        // Get from_uri and concurrency for creating new consumer
1258        let from_uri = managed.from_uri.clone();
1259
1260        info!(route_id = %route_id, "Resuming route (spawning consumer only)");
1261
1262        // Parse from URI and create consumer (lock registry for lookup)
1263        let parsed = parse_uri(&from_uri)?;
1264        let registry = self
1265            .registry
1266            .lock()
1267            .expect("mutex poisoned: another thread panicked while holding this lock");
1268        let component = registry.get_or_err(&parsed.scheme)?;
1269        let endpoint = component.create_endpoint(&from_uri)?;
1270        let mut consumer = endpoint.create_consumer()?;
1271        // Drop the lock before spawning tasks
1272        drop(registry);
1273
1274        // Get the managed route for mutation
1275        let managed = self
1276            .routes
1277            .get_mut(route_id)
1278            .expect("invariant: route must exist after prior existence check");
1279
1280        // Create child token for consumer lifecycle
1281        let consumer_cancel = managed.consumer_cancel_token.child_token();
1282
1283        // Clone status Arc for the consumer task
1284        let status_for_consumer = Arc::clone(&managed.status);
1285        let crash_notifier = self.crash_notifier.clone();
1286
1287        // Create ConsumerContext with the stored sender
1288        let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
1289
1290        // Spawn consumer task
1291        let route_id_for_consumer = route_id.to_string();
1292        let consumer_handle = tokio::spawn(async move {
1293            if let Err(e) = consumer.start(consumer_ctx).await {
1294                error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
1295                let error_msg = e.to_string();
1296                *status_for_consumer.lock().expect("status mutex poisoned") =
1297                    RouteStatus::Failed(error_msg.clone());
1298
1299                // Send crash notification if notifier is configured
1300                if let Some(tx) = crash_notifier {
1301                    let _ = tx
1302                        .send(CrashNotification {
1303                            route_id: route_id_for_consumer.clone(),
1304                            error: error_msg,
1305                        })
1306                        .await;
1307                }
1308            }
1309        });
1310
1311        // Store consumer handle and update status
1312        let managed = self
1313            .routes
1314            .get_mut(route_id)
1315            .expect("invariant: route must exist after prior existence check");
1316        managed.consumer_handle = Some(consumer_handle);
1317
1318        // Only mark as Started if consumer hasn't already crashed
1319        let mut status_guard = managed.status.lock().expect("status mutex poisoned");
1320        if !matches!(*status_guard, RouteStatus::Failed(_)) {
1321            *status_guard = RouteStatus::Started;
1322        } else {
1323            // Resume failed - return error
1324            let failed_status = status_guard.clone();
1325            drop(status_guard);
1326            return Err(CamelError::RouteError(format!(
1327                "Resume failed: {:?}",
1328                failed_status
1329            )));
1330        }
1331        drop(status_guard);
1332
1333        info!(route_id = %route_id, "Route resumed");
1334        Ok(())
1335    }
1336
1337    fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
1338        self.routes
1339            .get(route_id)
1340            .map(|r| r.status.lock().expect("status mutex poisoned").clone())
1341    }
1342
1343    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1344        // Only start routes where auto_startup() == true
1345        // Sort by startup_order() ascending before starting
1346        let route_ids: Vec<String> = {
1347            let mut pairs: Vec<_> = self
1348                .routes
1349                .iter()
1350                .filter(|(_, r)| r.definition.auto_startup())
1351                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1352                .collect();
1353            pairs.sort_by_key(|(_, order)| *order);
1354            pairs.into_iter().map(|(id, _)| id).collect()
1355        };
1356
1357        info!("Starting {} auto-startup routes", route_ids.len());
1358
1359        // Collect errors but continue starting remaining routes
1360        let mut errors: Vec<String> = Vec::new();
1361        for route_id in route_ids {
1362            if let Err(e) = self.start_route(&route_id).await {
1363                errors.push(format!("Route '{}': {}", route_id, e));
1364            }
1365        }
1366
1367        if !errors.is_empty() {
1368            return Err(CamelError::RouteError(format!(
1369                "Failed to start routes: {}",
1370                errors.join(", ")
1371            )));
1372        }
1373
1374        info!("All auto-startup routes started");
1375        Ok(())
1376    }
1377
1378    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1379        // Sort by startup_order descending (reverse order)
1380        let route_ids: Vec<String> = {
1381            let mut pairs: Vec<_> = self
1382                .routes
1383                .iter()
1384                .map(|(id, r)| (id.clone(), r.definition.startup_order()))
1385                .collect();
1386            pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
1387            pairs.into_iter().map(|(id, _)| id).collect()
1388        };
1389
1390        info!("Stopping {} routes", route_ids.len());
1391
1392        for route_id in route_ids {
1393            let _ = self.stop_route(&route_id).await;
1394        }
1395
1396        info!("All routes stopped");
1397        Ok(())
1398    }
1399}
1400
1401#[async_trait::async_trait]
1402impl RouteControllerInternal for DefaultRouteController {
1403    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
1404        DefaultRouteController::add_route(self, def)
1405    }
1406
1407    fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
1408        DefaultRouteController::swap_pipeline(self, route_id, pipeline)
1409    }
1410
1411    fn route_from_uri(&self, route_id: &str) -> Option<String> {
1412        // Call the inherent method which now returns Option<String>
1413        DefaultRouteController::route_from_uri(self, route_id)
1414    }
1415
1416    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
1417        DefaultRouteController::set_error_handler(self, config)
1418    }
1419
1420    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
1421        DefaultRouteController::set_self_ref(self, self_ref)
1422    }
1423
1424    fn route_count(&self) -> usize {
1425        DefaultRouteController::route_count(self)
1426    }
1427
1428    fn route_ids(&self) -> Vec<String> {
1429        DefaultRouteController::route_ids(self)
1430    }
1431
1432    fn set_tracer_config(&mut self, config: &TracerConfig) {
1433        DefaultRouteController::set_tracer_config(self, config)
1434    }
1435
1436    fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
1437        DefaultRouteController::compile_route_definition(self, def)
1438    }
1439
1440    fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
1441        DefaultRouteController::remove_route(self, route_id)
1442    }
1443
1444    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1445        DefaultRouteController::start_route(self, route_id).await
1446    }
1447
1448    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
1449        DefaultRouteController::stop_route(self, route_id).await
1450    }
1451}
1452
1453#[cfg(test)]
1454mod tests {
1455    use super::*;
1456    use std::sync::Arc;
1457
1458    #[test]
1459    fn test_route_controller_internal_is_object_safe() {
1460        // Verifies the trait compiles as a trait object.
1461        // If RouteControllerInternal is not object-safe, this test fails to compile.
1462        let _: Option<Box<dyn RouteControllerInternal>> = None;
1463    }
1464
1465    #[tokio::test]
1466    async fn test_swap_pipeline_updates_stored_pipeline() {
1467        use camel_api::IdentityProcessor;
1468
1469        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1470        let mut controller = DefaultRouteController::new(registry);
1471
1472        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1473            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1474        ));
1475        controller.set_self_ref(controller_arc);
1476
1477        let definition =
1478            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("swap-test");
1479        controller.add_route(definition).unwrap();
1480
1481        // Swap pipeline should succeed
1482        let new_pipeline = BoxProcessor::new(IdentityProcessor);
1483        let result = controller.swap_pipeline("swap-test", new_pipeline);
1484        assert!(result.is_ok());
1485
1486        // Swap on non-existent route should fail
1487        let new_pipeline = BoxProcessor::new(IdentityProcessor);
1488        let result = controller.swap_pipeline("nonexistent", new_pipeline);
1489        assert!(result.is_err());
1490    }
1491
1492    #[tokio::test]
1493    async fn test_add_route_duplicate_id_fails() {
1494        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1495        let mut controller = DefaultRouteController::new(registry);
1496
1497        // Set self_ref to avoid error during add_route
1498        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1499            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1500        ));
1501        controller.set_self_ref(controller_arc);
1502
1503        let definition = crate::route::RouteDefinition::new("timer:tick", vec![])
1504            .with_route_id("duplicate-route");
1505        assert!(controller.add_route(definition).is_ok());
1506
1507        // Adding a route with the same ID should fail
1508        let definition2 = crate::route::RouteDefinition::new("timer:tock", vec![])
1509            .with_route_id("duplicate-route");
1510        let result = controller.add_route(definition2);
1511        assert!(result.is_err());
1512        let err = result.unwrap_err().to_string();
1513        assert!(
1514            err.contains("already exists"),
1515            "error should mention 'already exists', got: {}",
1516            err
1517        );
1518    }
1519
1520    #[tokio::test]
1521    async fn test_add_route_with_id_succeeds() {
1522        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1523        let mut controller = DefaultRouteController::new(registry);
1524
1525        // Set self_ref to avoid error during add_route
1526        let controller_arc: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1527            DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new()))),
1528        ));
1529        controller.set_self_ref(controller_arc);
1530
1531        let definition =
1532            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
1533        assert!(controller.add_route(definition).is_ok());
1534        assert_eq!(controller.route_count(), 1);
1535    }
1536
1537    #[tokio::test]
1538    async fn test_crashed_consumer_sets_failed_status() {
1539        use async_trait::async_trait;
1540        use camel_api::{CamelError, RouteStatus};
1541        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1542
1543        struct CrashingConsumer;
1544        #[async_trait]
1545        impl camel_component::Consumer for CrashingConsumer {
1546            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1547                Err(CamelError::RouteError("boom".into()))
1548            }
1549            async fn stop(&mut self) -> Result<(), CamelError> {
1550                Ok(())
1551            }
1552            fn concurrency_model(&self) -> ConcurrencyModel {
1553                ConcurrencyModel::Sequential
1554            }
1555        }
1556        struct CrashingEndpoint;
1557        impl Endpoint for CrashingEndpoint {
1558            fn uri(&self) -> &str {
1559                "crash:test"
1560            }
1561            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1562                Ok(Box::new(CrashingConsumer))
1563            }
1564            fn create_producer(
1565                &self,
1566                _ctx: &camel_api::ProducerContext,
1567            ) -> Result<camel_api::BoxProcessor, CamelError> {
1568                Err(CamelError::RouteError("no producer".into()))
1569            }
1570        }
1571        struct CrashingComponent;
1572        impl camel_component::Component for CrashingComponent {
1573            fn scheme(&self) -> &str {
1574                "crash"
1575            }
1576            fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1577                let _ = uri; // satisfy unused variable warning
1578                Ok(Box::new(CrashingEndpoint))
1579            }
1580        }
1581
1582        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1583        registry.lock().unwrap().register(CrashingComponent);
1584        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
1585        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1586            DefaultRouteController::new(Arc::clone(&registry)),
1587        ));
1588        controller.set_self_ref(self_ref);
1589
1590        let def =
1591            crate::route::RouteDefinition::new("crash:test", vec![]).with_route_id("crash-route");
1592        controller.add_route(def).unwrap();
1593        controller.start_route("crash-route").await.unwrap();
1594
1595        // Give consumer task time to crash and update status
1596        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1597
1598        let status = controller.route_status("crash-route").unwrap();
1599        assert!(
1600            matches!(status, RouteStatus::Failed(_)),
1601            "expected Failed, got {:?}",
1602            status
1603        );
1604    }
1605
1606    /// Test demonstrating independent consumer/pipeline cancellation token scaffolding.
1607    ///
1608    /// This test verifies that ManagedRoute has separate cancellation tokens
1609    /// for consumer and pipeline, enabling future independent lifecycle control
1610    /// (e.g., suspending consumer while pipeline continues processing in-flight messages).
1611    #[test]
1612    fn test_managed_route_has_independent_cancel_tokens() {
1613        // This test verifies the structure exists by checking that:
1614        // 1. Both tokens are created on route addition
1615        // 2. Both tokens are recreated after stop
1616        //
1617        // The actual independent control will be implemented in Task 3.
1618        // This test serves as scaffolding verification.
1619
1620        let consumer_token = CancellationToken::new();
1621        let pipeline_token = CancellationToken::new();
1622
1623        // Verify tokens start uncancelled
1624        assert!(
1625            !consumer_token.is_cancelled(),
1626            "consumer token should start uncancelled"
1627        );
1628        assert!(
1629            !pipeline_token.is_cancelled(),
1630            "pipeline token should start uncancelled"
1631        );
1632
1633        // Verify tokens can be cancelled independently
1634        consumer_token.cancel();
1635        assert!(
1636            consumer_token.is_cancelled(),
1637            "consumer token should be cancelled"
1638        );
1639        assert!(
1640            !pipeline_token.is_cancelled(),
1641            "pipeline token should NOT be cancelled when consumer is cancelled"
1642        );
1643
1644        // Verify independent control works the other way
1645        let consumer_token2 = CancellationToken::new();
1646        let pipeline_token2 = CancellationToken::new();
1647
1648        pipeline_token2.cancel();
1649        assert!(
1650            !consumer_token2.is_cancelled(),
1651            "consumer token should NOT be cancelled when pipeline is cancelled"
1652        );
1653        assert!(
1654            pipeline_token2.is_cancelled(),
1655            "pipeline token should be cancelled"
1656        );
1657
1658        // This demonstrates the scaffolding is in place for Task 3:
1659        // - Consumer and pipeline have independent CancellationToken fields
1660        // - They can be controlled separately without affecting each other
1661    }
1662
1663    /// Test verifying stop_route cancels both consumer and pipeline tokens.
1664    #[tokio::test]
1665    async fn test_stop_cancels_both_consumer_and_pipeline_tokens() {
1666        use camel_api::IdentityProcessor;
1667        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1668
1669        // Consumer that tracks cancellation
1670        struct TrackingConsumer {
1671            cancelled: Arc<std::sync::atomic::AtomicBool>,
1672        }
1673        #[async_trait::async_trait]
1674        impl camel_component::Consumer for TrackingConsumer {
1675            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1676                // Wait for cancellation signal using the public API
1677                ctx.cancelled().await;
1678                self.cancelled
1679                    .store(true, std::sync::atomic::Ordering::SeqCst);
1680                Ok(())
1681            }
1682            async fn stop(&mut self) -> Result<(), CamelError> {
1683                Ok(())
1684            }
1685            fn concurrency_model(&self) -> ConcurrencyModel {
1686                ConcurrencyModel::Sequential
1687            }
1688        }
1689        struct TrackingEndpoint {
1690            cancelled: Arc<std::sync::atomic::AtomicBool>,
1691        }
1692        impl Endpoint for TrackingEndpoint {
1693            fn uri(&self) -> &str {
1694                "tracking:test"
1695            }
1696            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1697                Ok(Box::new(TrackingConsumer {
1698                    cancelled: Arc::clone(&self.cancelled),
1699                }))
1700            }
1701            fn create_producer(
1702                &self,
1703                _ctx: &camel_api::ProducerContext,
1704            ) -> Result<camel_api::BoxProcessor, CamelError> {
1705                Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1706            }
1707        }
1708        struct TrackingComponent {
1709            cancelled: Arc<std::sync::atomic::AtomicBool>,
1710        }
1711        impl camel_component::Component for TrackingComponent {
1712            fn scheme(&self) -> &str {
1713                "tracking"
1714            }
1715            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1716                Ok(Box::new(TrackingEndpoint {
1717                    cancelled: Arc::clone(&self.cancelled),
1718                }))
1719            }
1720        }
1721
1722        let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
1723        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1724        registry.lock().unwrap().register(TrackingComponent {
1725            cancelled: Arc::clone(&cancelled),
1726        });
1727        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
1728        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1729            DefaultRouteController::new(Arc::clone(&registry)),
1730        ));
1731        controller.set_self_ref(self_ref);
1732
1733        let def = crate::route::RouteDefinition::new("tracking:test", vec![])
1734            .with_route_id("tracking-route");
1735        controller.add_route(def).unwrap();
1736
1737        // Start the route
1738        controller.start_route("tracking-route").await.unwrap();
1739        assert_eq!(
1740            controller.route_status("tracking-route"),
1741            Some(RouteStatus::Started)
1742        );
1743
1744        // Stop the route - should cancel both tokens
1745        controller.stop_route("tracking-route").await.unwrap();
1746
1747        // Wait for consumer to observe cancellation
1748        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1749
1750        // Verify the consumer was cancelled (proving consumer_cancel_token was cancelled)
1751        assert!(
1752            cancelled.load(std::sync::atomic::Ordering::SeqCst),
1753            "consumer should have been cancelled via consumer_cancel_token"
1754        );
1755
1756        // Verify route is stopped
1757        assert_eq!(
1758            controller.route_status("tracking-route"),
1759            Some(RouteStatus::Stopped)
1760        );
1761
1762        // Verify tokens are recreated for next start
1763        controller.start_route("tracking-route").await.unwrap();
1764        assert_eq!(
1765            controller.route_status("tracking-route"),
1766            Some(RouteStatus::Started)
1767        );
1768        controller.stop_route("tracking-route").await.unwrap();
1769    }
1770
1771    /// Test that suspend_route cancels consumer but keeps pipeline running.
1772    #[tokio::test]
1773    async fn test_suspend() {
1774        use camel_api::IdentityProcessor;
1775        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1776        use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1777
1778        // Consumer that tracks start/stop and can be controlled
1779        struct SuspendableConsumer {
1780            started: Arc<AtomicUsize>,
1781            cancelled: Arc<AtomicBool>,
1782        }
1783        #[async_trait::async_trait]
1784        impl camel_component::Consumer for SuspendableConsumer {
1785            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1786                self.started.fetch_add(1, Ordering::SeqCst);
1787                // Wait for cancellation
1788                ctx.cancelled().await;
1789                self.cancelled.store(true, Ordering::SeqCst);
1790                Ok(())
1791            }
1792            async fn stop(&mut self) -> Result<(), CamelError> {
1793                Ok(())
1794            }
1795            fn concurrency_model(&self) -> ConcurrencyModel {
1796                ConcurrencyModel::Sequential
1797            }
1798        }
1799        struct SuspendableEndpoint {
1800            started: Arc<AtomicUsize>,
1801            cancelled: Arc<AtomicBool>,
1802        }
1803        impl Endpoint for SuspendableEndpoint {
1804            fn uri(&self) -> &str {
1805                "suspend:test"
1806            }
1807            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1808                Ok(Box::new(SuspendableConsumer {
1809                    started: Arc::clone(&self.started),
1810                    cancelled: Arc::clone(&self.cancelled),
1811                }))
1812            }
1813            fn create_producer(
1814                &self,
1815                _ctx: &camel_api::ProducerContext,
1816            ) -> Result<camel_api::BoxProcessor, CamelError> {
1817                Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1818            }
1819        }
1820        struct SuspendableComponent {
1821            started: Arc<AtomicUsize>,
1822            cancelled: Arc<AtomicBool>,
1823        }
1824        impl camel_component::Component for SuspendableComponent {
1825            fn scheme(&self) -> &str {
1826                "suspend"
1827            }
1828            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1829                Ok(Box::new(SuspendableEndpoint {
1830                    started: Arc::clone(&self.started),
1831                    cancelled: Arc::clone(&self.cancelled),
1832                }))
1833            }
1834        }
1835
1836        let started = Arc::new(AtomicUsize::new(0));
1837        let cancelled = Arc::new(AtomicBool::new(false));
1838        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1839        registry.lock().unwrap().register(SuspendableComponent {
1840            started: Arc::clone(&started),
1841            cancelled: Arc::clone(&cancelled),
1842        });
1843        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
1844        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1845            DefaultRouteController::new(Arc::clone(&registry)),
1846        ));
1847        controller.set_self_ref(self_ref);
1848
1849        let def = crate::route::RouteDefinition::new("suspend:test", vec![])
1850            .with_route_id("suspend-route");
1851        controller.add_route(def).unwrap();
1852
1853        // Start the route
1854        controller.start_route("suspend-route").await.unwrap();
1855        assert_eq!(
1856            controller.route_status("suspend-route"),
1857            Some(RouteStatus::Started)
1858        );
1859
1860        // Give the consumer time to start
1861        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1862        assert_eq!(started.load(Ordering::SeqCst), 1);
1863
1864        // Suspend the route
1865        controller.suspend_route("suspend-route").await.unwrap();
1866        assert_eq!(
1867            controller.route_status("suspend-route"),
1868            Some(RouteStatus::Suspended)
1869        );
1870
1871        // Wait for consumer to observe cancellation
1872        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1873
1874        // Verify consumer was cancelled
1875        assert!(
1876            cancelled.load(Ordering::SeqCst),
1877            "consumer should have been cancelled during suspend"
1878        );
1879
1880        // Verify start_route on Suspended returns error
1881        let result = controller.start_route("suspend-route").await;
1882        assert!(result.is_err());
1883        let err = result.unwrap_err().to_string();
1884        assert!(
1885            err.contains("suspended"),
1886            "error should mention 'suspended', got: {}",
1887            err
1888        );
1889
1890        // Stop the suspended route
1891        controller.stop_route("suspend-route").await.unwrap();
1892        assert_eq!(
1893            controller.route_status("suspend-route"),
1894            Some(RouteStatus::Stopped)
1895        );
1896    }
1897
1898    /// Test that resume_route spawns only consumer task, reusing the pipeline.
1899    #[tokio::test]
1900    async fn test_resume() {
1901        use camel_api::IdentityProcessor;
1902        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
1903        use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1904
1905        // Consumer that tracks start count and can be controlled
1906        struct ResumableConsumer {
1907            started: Arc<AtomicUsize>,
1908            cancelled: Arc<AtomicBool>,
1909        }
1910        #[async_trait::async_trait]
1911        impl camel_component::Consumer for ResumableConsumer {
1912            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1913                self.started.fetch_add(1, Ordering::SeqCst);
1914                // Reset cancelled flag on each start
1915                self.cancelled.store(false, Ordering::SeqCst);
1916                // Wait for cancellation
1917                ctx.cancelled().await;
1918                self.cancelled.store(true, Ordering::SeqCst);
1919                Ok(())
1920            }
1921            async fn stop(&mut self) -> Result<(), CamelError> {
1922                Ok(())
1923            }
1924            fn concurrency_model(&self) -> ConcurrencyModel {
1925                ConcurrencyModel::Sequential
1926            }
1927        }
1928        struct ResumableEndpoint {
1929            started: Arc<AtomicUsize>,
1930            cancelled: Arc<AtomicBool>,
1931        }
1932        impl Endpoint for ResumableEndpoint {
1933            fn uri(&self) -> &str {
1934                "resume:test"
1935            }
1936            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1937                Ok(Box::new(ResumableConsumer {
1938                    started: Arc::clone(&self.started),
1939                    cancelled: Arc::clone(&self.cancelled),
1940                }))
1941            }
1942            fn create_producer(
1943                &self,
1944                _ctx: &camel_api::ProducerContext,
1945            ) -> Result<camel_api::BoxProcessor, CamelError> {
1946                Ok(camel_api::BoxProcessor::new(IdentityProcessor))
1947            }
1948        }
1949        struct ResumableComponent {
1950            started: Arc<AtomicUsize>,
1951            cancelled: Arc<AtomicBool>,
1952        }
1953        impl camel_component::Component for ResumableComponent {
1954            fn scheme(&self) -> &str {
1955                "resume"
1956            }
1957            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1958                Ok(Box::new(ResumableEndpoint {
1959                    started: Arc::clone(&self.started),
1960                    cancelled: Arc::clone(&self.cancelled),
1961                }))
1962            }
1963        }
1964
1965        let started = Arc::new(AtomicUsize::new(0));
1966        let cancelled = Arc::new(AtomicBool::new(false));
1967        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
1968        registry.lock().unwrap().register(ResumableComponent {
1969            started: Arc::clone(&started),
1970            cancelled: Arc::clone(&cancelled),
1971        });
1972        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
1973        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
1974            DefaultRouteController::new(Arc::clone(&registry)),
1975        ));
1976        controller.set_self_ref(self_ref);
1977
1978        let def =
1979            crate::route::RouteDefinition::new("resume:test", vec![]).with_route_id("resume-route");
1980        controller.add_route(def).unwrap();
1981
1982        // Start the route
1983        controller.start_route("resume-route").await.unwrap();
1984        assert_eq!(
1985            controller.route_status("resume-route"),
1986            Some(RouteStatus::Started)
1987        );
1988
1989        // Give the consumer time to start
1990        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1991        assert_eq!(started.load(Ordering::SeqCst), 1);
1992
1993        // Suspend the route
1994        controller.suspend_route("resume-route").await.unwrap();
1995        assert_eq!(
1996            controller.route_status("resume-route"),
1997            Some(RouteStatus::Suspended)
1998        );
1999
2000        // Wait for consumer to observe cancellation
2001        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002        assert!(cancelled.load(Ordering::SeqCst));
2003
2004        // Resume the route - should create a new consumer (started count increases)
2005        controller.resume_route("resume-route").await.unwrap();
2006        assert_eq!(
2007            controller.route_status("resume-route"),
2008            Some(RouteStatus::Started)
2009        );
2010
2011        // Give the new consumer time to start
2012        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2013
2014        // Verify consumer was started again (new consumer instance)
2015        assert_eq!(
2016            started.load(Ordering::SeqCst),
2017            2,
2018            "consumer should have been started twice (initial + resume)"
2019        );
2020
2021        // Verify the new consumer is not cancelled
2022        assert!(
2023            !cancelled.load(Ordering::SeqCst),
2024            "new consumer should not be cancelled after resume"
2025        );
2026
2027        // Test that resume on non-suspended route fails
2028        let result = controller.resume_route("resume-route").await;
2029        assert!(result.is_err());
2030        let err = result.unwrap_err().to_string();
2031        assert!(
2032            err.contains("not suspended") || err.contains("expected Suspended"),
2033            "error should indicate route is not suspended, got: {}",
2034            err
2035        );
2036
2037        // Clean up
2038        controller.stop_route("resume-route").await.unwrap();
2039        assert_eq!(
2040            controller.route_status("resume-route"),
2041            Some(RouteStatus::Stopped)
2042        );
2043
2044        // Test that resume on stopped route fails
2045        let result = controller.resume_route("resume-route").await;
2046        assert!(result.is_err());
2047    }
2048
2049    /// Test that resume on non-existent route fails.
2050    #[tokio::test]
2051    async fn test_resume_non_existent_route() {
2052        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2053        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
2054        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2055            DefaultRouteController::new(Arc::clone(&registry)),
2056        ));
2057        controller.set_self_ref(self_ref);
2058
2059        let result = controller.resume_route("non-existent").await;
2060        assert!(result.is_err());
2061        let err = result.unwrap_err().to_string();
2062        assert!(
2063            err.contains("not found"),
2064            "error should mention 'not found', got: {}",
2065            err
2066        );
2067    }
2068
2069    /// Test that suspend on non-started route fails.
2070    #[tokio::test]
2071    async fn test_suspend_non_started_route() {
2072        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2073        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
2074        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2075            DefaultRouteController::new(Arc::clone(&registry)),
2076        ));
2077        controller.set_self_ref(self_ref);
2078
2079        let def =
2080            crate::route::RouteDefinition::new("timer:tick", vec![]).with_route_id("test-route");
2081        controller.add_route(def).unwrap();
2082
2083        // Suspend on stopped route should fail
2084        let result = controller.suspend_route("test-route").await;
2085        assert!(result.is_err());
2086        let err = result.unwrap_err().to_string();
2087        assert!(
2088            err.contains("Cannot suspend") || err.contains("Started"),
2089            "error should indicate cannot suspend, got: {}",
2090            err
2091        );
2092    }
2093
2094    /// Regression test: stop from Suspended state must cancel BOTH consumer AND pipeline.
2095    ///
2096    /// This test ensures the suspend/resume refactor didn't break stop semantics.
2097    /// When a route is suspended, the pipeline is still running (waiting on channel).
2098    /// Calling stop_route from Suspended must cancel BOTH the consumer AND the pipeline.
2099    #[tokio::test]
2100    async fn test_stop_from_suspended_cancels_consumer_and_pipeline() {
2101        use camel_api::IdentityProcessor;
2102        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
2103        use std::sync::atomic::{AtomicBool, Ordering};
2104
2105        // Consumer that tracks cancellation
2106        struct CancellableConsumer {
2107            cancelled: Arc<AtomicBool>,
2108        }
2109        #[async_trait::async_trait]
2110        impl camel_component::Consumer for CancellableConsumer {
2111            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
2112                ctx.cancelled().await;
2113                self.cancelled.store(true, Ordering::SeqCst);
2114                Ok(())
2115            }
2116            async fn stop(&mut self) -> Result<(), CamelError> {
2117                Ok(())
2118            }
2119            fn concurrency_model(&self) -> ConcurrencyModel {
2120                ConcurrencyModel::Sequential
2121            }
2122        }
2123        struct CancellableEndpoint {
2124            cancelled: Arc<AtomicBool>,
2125        }
2126        impl Endpoint for CancellableEndpoint {
2127            fn uri(&self) -> &str {
2128                "cancellable:test"
2129            }
2130            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
2131                Ok(Box::new(CancellableConsumer {
2132                    cancelled: Arc::clone(&self.cancelled),
2133                }))
2134            }
2135            fn create_producer(
2136                &self,
2137                _ctx: &camel_api::ProducerContext,
2138            ) -> Result<camel_api::BoxProcessor, CamelError> {
2139                Ok(camel_api::BoxProcessor::new(IdentityProcessor))
2140            }
2141        }
2142        struct CancellableComponent {
2143            cancelled: Arc<AtomicBool>,
2144        }
2145        impl camel_component::Component for CancellableComponent {
2146            fn scheme(&self) -> &str {
2147                "cancellable"
2148            }
2149            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
2150                Ok(Box::new(CancellableEndpoint {
2151                    cancelled: Arc::clone(&self.cancelled),
2152                }))
2153            }
2154        }
2155
2156        let consumer_cancelled = Arc::new(AtomicBool::new(false));
2157        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2158        registry.lock().unwrap().register(CancellableComponent {
2159            cancelled: Arc::clone(&consumer_cancelled),
2160        });
2161        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
2162        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2163            DefaultRouteController::new(Arc::clone(&registry)),
2164        ));
2165        controller.set_self_ref(self_ref);
2166
2167        let def = crate::route::RouteDefinition::new("cancellable:test", vec![])
2168            .with_route_id("stop-from-suspended-route");
2169        controller.add_route(def).unwrap();
2170
2171        // Start the route
2172        controller
2173            .start_route("stop-from-suspended-route")
2174            .await
2175            .unwrap();
2176        assert_eq!(
2177            controller.route_status("stop-from-suspended-route"),
2178            Some(RouteStatus::Started)
2179        );
2180
2181        // Suspend the route (consumer cancelled, pipeline still running)
2182        controller
2183            .suspend_route("stop-from-suspended-route")
2184            .await
2185            .unwrap();
2186        assert_eq!(
2187            controller.route_status("stop-from-suspended-route"),
2188            Some(RouteStatus::Suspended)
2189        );
2190
2191        // Wait for consumer to observe cancellation from suspend
2192        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2193        assert!(
2194            consumer_cancelled.load(Ordering::SeqCst),
2195            "consumer should be cancelled after suspend"
2196        );
2197
2198        // Reset the flag to detect if stop re-cancels the consumer token
2199        consumer_cancelled.store(false, Ordering::SeqCst);
2200
2201        // STOP from Suspended state - this is the critical regression test
2202        // Must cancel BOTH consumer token (even though consumer isn't running) AND pipeline token
2203        controller
2204            .stop_route("stop-from-suspended-route")
2205            .await
2206            .unwrap();
2207
2208        // Verify final state is Stopped
2209        assert_eq!(
2210            controller.route_status("stop-from-suspended-route"),
2211            Some(RouteStatus::Stopped),
2212            "route must be Stopped after stop_route from Suspended"
2213        );
2214
2215        // Wait for pipeline task to observe cancellation
2216        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2217
2218        // The route can be started again, proving clean shutdown
2219        controller
2220            .start_route("stop-from-suspended-route")
2221            .await
2222            .unwrap();
2223        assert_eq!(
2224            controller.route_status("stop-from-suspended-route"),
2225            Some(RouteStatus::Started)
2226        );
2227
2228        controller
2229            .stop_route("stop-from-suspended-route")
2230            .await
2231            .unwrap();
2232    }
2233
2234    /// Test full suspend/resume cycle with stop.
2235    #[tokio::test]
2236    async fn test_suspend_resume_stop_cycle() {
2237        use camel_api::IdentityProcessor;
2238        use camel_component::{ConcurrencyModel, ConsumerContext, Endpoint};
2239
2240        // Simple consumer that waits for cancellation
2241        struct SimpleConsumer;
2242        #[async_trait::async_trait]
2243        impl camel_component::Consumer for SimpleConsumer {
2244            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
2245                ctx.cancelled().await;
2246                Ok(())
2247            }
2248            async fn stop(&mut self) -> Result<(), CamelError> {
2249                Ok(())
2250            }
2251            fn concurrency_model(&self) -> ConcurrencyModel {
2252                ConcurrencyModel::Sequential
2253            }
2254        }
2255        struct SimpleEndpoint;
2256        impl Endpoint for SimpleEndpoint {
2257            fn uri(&self) -> &str {
2258                "simple:test"
2259            }
2260            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
2261                Ok(Box::new(SimpleConsumer))
2262            }
2263            fn create_producer(
2264                &self,
2265                _ctx: &camel_api::ProducerContext,
2266            ) -> Result<camel_api::BoxProcessor, CamelError> {
2267                Ok(camel_api::BoxProcessor::new(IdentityProcessor))
2268            }
2269        }
2270        struct SimpleComponent;
2271        impl camel_component::Component for SimpleComponent {
2272            fn scheme(&self) -> &str {
2273                "simple"
2274            }
2275            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
2276                Ok(Box::new(SimpleEndpoint))
2277            }
2278        }
2279
2280        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
2281        registry.lock().unwrap().register(SimpleComponent);
2282        let mut controller = DefaultRouteController::new(Arc::clone(&registry));
2283        let self_ref: Arc<Mutex<dyn RouteController>> = Arc::new(Mutex::new(
2284            DefaultRouteController::new(Arc::clone(&registry)),
2285        ));
2286        controller.set_self_ref(self_ref);
2287
2288        let def =
2289            crate::route::RouteDefinition::new("simple:test", vec![]).with_route_id("cycle-route");
2290        controller.add_route(def).unwrap();
2291
2292        // Full cycle: start -> suspend -> resume -> suspend -> stop
2293        controller.start_route("cycle-route").await.unwrap();
2294        assert_eq!(
2295            controller.route_status("cycle-route"),
2296            Some(RouteStatus::Started)
2297        );
2298
2299        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2300
2301        controller.suspend_route("cycle-route").await.unwrap();
2302        assert_eq!(
2303            controller.route_status("cycle-route"),
2304            Some(RouteStatus::Suspended)
2305        );
2306
2307        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2308
2309        controller.resume_route("cycle-route").await.unwrap();
2310        assert_eq!(
2311            controller.route_status("cycle-route"),
2312            Some(RouteStatus::Started)
2313        );
2314
2315        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2316
2317        controller.suspend_route("cycle-route").await.unwrap();
2318        assert_eq!(
2319            controller.route_status("cycle-route"),
2320            Some(RouteStatus::Suspended)
2321        );
2322
2323        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2324
2325        // Stop from suspended state
2326        controller.stop_route("cycle-route").await.unwrap();
2327        assert_eq!(
2328            controller.route_status("cycle-route"),
2329            Some(RouteStatus::Stopped)
2330        );
2331
2332        // Can start again after stop
2333        controller.start_route("cycle-route").await.unwrap();
2334        assert_eq!(
2335            controller.route_status("cycle-route"),
2336            Some(RouteStatus::Started)
2337        );
2338
2339        controller.stop_route("cycle-route").await.unwrap();
2340    }
2341}