Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7use tokio_util::sync::CancellationToken;
8use tracing::{info, warn};
9
10use camel_api::error_handler::ErrorHandlerConfig;
11use camel_api::{
12    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
13    RouteController, RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus,
14    SupervisionConfig,
15};
16use camel_component::Component;
17use camel_language_api::Language;
18use camel_language_api::LanguageError;
19
20use crate::lifecycle::adapters::RuntimeExecutionAdapter;
21use crate::lifecycle::adapters::redb_journal::{RedbJournalOptions, RedbRuntimeEventJournal};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::application::supervision_service::SupervisingRouteController;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
34///
35/// # Lifecycle
36///
37/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
38/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
39/// stopped context is not supported — create a new instance instead.
40pub struct CamelContext {
41    registry: Arc<std::sync::Mutex<Registry>>,
42    route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
43    runtime: Arc<RuntimeBus>,
44    cancel_token: CancellationToken,
45    metrics: Arc<dyn MetricsCollector>,
46    languages: SharedLanguageRegistry,
47    shutdown_timeout: std::time::Duration,
48    services: Vec<Box<dyn Lifecycle>>,
49    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
50}
51
52/// Opaque handle for runtime side-effect execution operations.
53///
54/// This intentionally does not expose direct lifecycle mutation APIs to callers.
55#[derive(Clone)]
56pub struct RuntimeExecutionHandle {
57    controller: Arc<Mutex<dyn RouteControllerInternal>>,
58    runtime: Arc<RuntimeBus>,
59}
60
61impl RuntimeExecutionHandle {
62    pub(crate) async fn add_route_definition(
63        &self,
64        definition: RouteDefinition,
65    ) -> Result<(), CamelError> {
66        use crate::lifecycle::ports::RouteRegistrationPort;
67        self.runtime.register_route(definition).await
68    }
69
70    pub(crate) async fn compile_route_definition(
71        &self,
72        definition: RouteDefinition,
73    ) -> Result<camel_api::BoxProcessor, CamelError> {
74        let controller = self.controller.lock().await;
75        controller.compile_route_definition(definition)
76    }
77
78    pub(crate) async fn swap_route_pipeline(
79        &self,
80        route_id: &str,
81        pipeline: camel_api::BoxProcessor,
82    ) -> Result<(), CamelError> {
83        let controller = self.controller.lock().await;
84        controller.swap_pipeline(route_id, pipeline)
85    }
86
87    pub(crate) async fn execute_runtime_command(
88        &self,
89        cmd: camel_api::RuntimeCommand,
90    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
91        self.runtime.execute(cmd).await
92    }
93
94    pub(crate) async fn runtime_route_status(
95        &self,
96        route_id: &str,
97    ) -> Result<Option<String>, CamelError> {
98        match self
99            .runtime
100            .ask(camel_api::RuntimeQuery::GetRouteStatus {
101                route_id: route_id.to_string(),
102            })
103            .await
104        {
105            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
106            Ok(_) => Err(CamelError::RouteError(
107                "unexpected runtime query response for route status".to_string(),
108            )),
109            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
110            Err(err) => Err(err),
111        }
112    }
113
114    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
115        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
116            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
117            Ok(_) => Err(CamelError::RouteError(
118                "unexpected runtime query response for route listing".to_string(),
119            )),
120            Err(err) => Err(err),
121        }
122    }
123
124    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
125        let controller = self.controller.lock().await;
126        if !controller.route_exists(route_id) {
127            return Err(CamelError::RouteError(format!(
128                "Route '{}' not found",
129                route_id
130            )));
131        }
132        // in_flight_count returns Some(N) when route exists.
133        // Some(0) means either "truly zero in-flight" or "no UoW tracking configured".
134        // Both cases are safe to treat as "drained, proceed immediately".
135        Ok(controller.in_flight_count(route_id).unwrap_or(0))
136    }
137
138    #[cfg(test)]
139    pub(crate) async fn force_start_route_for_test(
140        &self,
141        route_id: &str,
142    ) -> Result<(), CamelError> {
143        let mut controller = self.controller.lock().await;
144        controller.start_route(route_id).await
145    }
146
147    #[cfg(test)]
148    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
149        let controller = self.controller.lock().await;
150        controller.route_count()
151    }
152}
153
154impl CamelContext {
155    fn built_in_languages() -> SharedLanguageRegistry {
156        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
157        languages.insert(
158            "simple".to_string(),
159            Arc::new(camel_language_simple::SimpleLanguage),
160        );
161        #[cfg(feature = "lang-js")]
162        {
163            let js_lang = camel_language_js::JsLanguage::new();
164            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
165            languages.insert("javascript".to_string(), Arc::new(js_lang));
166        }
167        #[cfg(feature = "lang-rhai")]
168        {
169            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
170            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
171        }
172        #[cfg(feature = "lang-jsonpath")]
173        {
174            languages.insert(
175                "jsonpath".to_string(),
176                Arc::new(camel_language_jsonpath::JsonPathLanguage),
177            );
178        }
179        #[cfg(feature = "lang-xpath")]
180        {
181            languages.insert(
182                "xpath".to_string(),
183                Arc::new(camel_language_xpath::XPathLanguage),
184            );
185        }
186        Arc::new(std::sync::Mutex::new(languages))
187    }
188
189    fn build_runtime(
190        controller: Arc<Mutex<dyn RouteControllerInternal>>,
191        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
192    ) -> Arc<RuntimeBus> {
193        let execution = Arc::new(RuntimeExecutionAdapter::new(Arc::clone(&controller)));
194        Arc::new(
195            RuntimeBus::new(
196                Arc::new(store.clone()),
197                Arc::new(store.clone()),
198                Arc::new(store.clone()),
199                Arc::new(store.clone()),
200            )
201            .with_uow(Arc::new(store))
202            .with_execution(execution),
203        )
204    }
205
206    /// Create a new, empty CamelContext.
207    ///
208    /// Runtime state is in-memory/ephemeral by default.
209    pub fn new() -> Self {
210        Self::with_metrics(Arc::new(NoOpMetrics))
211    }
212
213    /// Create a new CamelContext with a custom metrics collector.
214    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
215        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
216        let languages = Self::built_in_languages();
217        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
218            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
219        ));
220        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
221        let runtime = Self::build_runtime(Arc::clone(&controller), store);
222
223        // Set self-ref so DefaultRouteController can create ProducerContext
224        // Use try_lock since we just created it and nobody else has access yet
225        let mut controller_guard = controller
226            .try_lock()
227            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
228        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
229        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
230        controller_guard.set_runtime_handle(runtime_handle);
231        drop(controller_guard);
232
233        Self {
234            registry,
235            route_controller: controller,
236            runtime,
237            cancel_token: CancellationToken::new(),
238            metrics,
239            languages,
240            shutdown_timeout: std::time::Duration::from_secs(30),
241            services: Vec::new(),
242            component_configs: HashMap::new(),
243        }
244    }
245
246    /// Create a new CamelContext with route supervision enabled.
247    ///
248    /// The supervision config controls automatic restart behavior for crashed routes.
249    pub fn with_supervision(config: SupervisionConfig) -> Self {
250        Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
251    }
252
253    /// Create a new CamelContext with route supervision and custom metrics.
254    ///
255    /// The supervision config controls automatic restart behavior for crashed routes.
256    pub fn with_supervision_and_metrics(
257        config: SupervisionConfig,
258        metrics: Arc<dyn MetricsCollector>,
259    ) -> Self {
260        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
261        let languages = Self::built_in_languages();
262        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
263            SupervisingRouteController::with_languages(
264                Arc::clone(&registry),
265                config,
266                Arc::clone(&languages),
267            )
268            .with_metrics(Arc::clone(&metrics)),
269        ));
270        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
271        let runtime = Self::build_runtime(Arc::clone(&controller), store);
272
273        // Set self-ref so SupervisingRouteController can create ProducerContext
274        // Use try_lock since we just created it and nobody else has access yet
275        let mut controller_guard = controller
276            .try_lock()
277            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
278        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
279        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
280        controller_guard.set_runtime_handle(runtime_handle);
281        drop(controller_guard);
282
283        Self {
284            registry,
285            route_controller: controller,
286            runtime,
287            cancel_token: CancellationToken::new(),
288            metrics,
289            languages,
290            shutdown_timeout: std::time::Duration::from_secs(30),
291            services: Vec::new(),
292            component_configs: HashMap::new(),
293        }
294    }
295
296    /// Create a new CamelContext backed by a redb runtime journal.
297    pub async fn new_with_redb_journal(
298        path: impl Into<PathBuf>,
299        options: RedbJournalOptions,
300    ) -> Result<Self, CamelError> {
301        Self::with_metrics_and_redb_journal(Arc::new(NoOpMetrics), path, options).await
302    }
303
304    /// Create a new CamelContext with custom metrics and a redb runtime journal.
305    pub async fn with_metrics_and_redb_journal(
306        metrics: Arc<dyn MetricsCollector>,
307        path: impl Into<PathBuf>,
308        options: RedbJournalOptions,
309    ) -> Result<Self, CamelError> {
310        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
311        let languages = Self::built_in_languages();
312        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
313            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
314        ));
315        let journal = RedbRuntimeEventJournal::new(path, options).await?;
316        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default()
317            .with_journal(Arc::new(journal));
318        let runtime = Self::build_runtime(Arc::clone(&controller), store);
319
320        // Set self-ref so DefaultRouteController can create ProducerContext
321        // Use try_lock since we just created it and nobody else has access yet
322        let mut controller_guard = controller
323            .try_lock()
324            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
325        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
326        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
327        controller_guard.set_runtime_handle(runtime_handle);
328        drop(controller_guard);
329
330        Ok(Self {
331            registry,
332            route_controller: controller,
333            runtime,
334            cancel_token: CancellationToken::new(),
335            metrics,
336            languages,
337            shutdown_timeout: std::time::Duration::from_secs(30),
338            services: Vec::new(),
339            component_configs: HashMap::new(),
340        })
341    }
342
343    /// Create a new CamelContext with supervision, custom metrics, and a redb runtime journal.
344    pub async fn with_supervision_and_metrics_and_redb_journal(
345        config: SupervisionConfig,
346        metrics: Arc<dyn MetricsCollector>,
347        path: impl Into<PathBuf>,
348        options: RedbJournalOptions,
349    ) -> Result<Self, CamelError> {
350        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
351        let languages = Self::built_in_languages();
352        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
353            SupervisingRouteController::with_languages(
354                Arc::clone(&registry),
355                config,
356                Arc::clone(&languages),
357            )
358            .with_metrics(Arc::clone(&metrics)),
359        ));
360        let journal = RedbRuntimeEventJournal::new(path, options).await?;
361        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default()
362            .with_journal(Arc::new(journal));
363        let runtime = Self::build_runtime(Arc::clone(&controller), store);
364
365        // Set self-ref so SupervisingRouteController can create ProducerContext
366        // Use try_lock since we just created it and nobody else has access yet
367        let mut controller_guard = controller
368            .try_lock()
369            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
370        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
371        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
372        controller_guard.set_runtime_handle(runtime_handle);
373        drop(controller_guard);
374
375        Ok(Self {
376            registry,
377            route_controller: controller,
378            runtime,
379            cancel_token: CancellationToken::new(),
380            metrics,
381            languages,
382            shutdown_timeout: std::time::Duration::from_secs(30),
383            services: Vec::new(),
384            component_configs: HashMap::new(),
385        })
386    }
387
388    /// Set a global error handler applied to all routes without a per-route handler.
389    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
390        self.route_controller
391            .try_lock()
392            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
393            .set_error_handler(config);
394    }
395
396    /// Enable or disable tracing globally.
397    pub fn set_tracing(&mut self, enabled: bool) {
398        self.route_controller
399            .try_lock()
400            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
401            .set_tracer_config(&TracerConfig {
402                enabled,
403                ..Default::default()
404            });
405    }
406
407    /// Configure tracing with full config.
408    pub fn set_tracer_config(&mut self, config: TracerConfig) {
409        // Inject metrics collector if not already set
410        let config = if config.metrics_collector.is_none() {
411            TracerConfig {
412                metrics_collector: Some(Arc::clone(&self.metrics)),
413                ..config
414            }
415        } else {
416            config
417        };
418
419        self.route_controller
420            .try_lock()
421            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
422            .set_tracer_config(&config);
423    }
424
425    /// Builder-style: enable tracing with default config.
426    pub fn with_tracing(mut self) -> Self {
427        self.set_tracing(true);
428        self
429    }
430
431    /// Builder-style: configure tracing with custom config.
432    /// Note: tracing subscriber initialization (stdout/file output) is handled
433    /// separately via init_tracing_subscriber (called in camel-config bridge).
434    pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
435        self.set_tracer_config(config);
436        self
437    }
438
439    /// Register a lifecycle service (Apache Camel: addService pattern)
440    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
441        // Auto-register MetricsCollector if available
442        if let Some(collector) = service.as_metrics_collector() {
443            self.metrics = collector;
444        }
445
446        self.services.push(Box::new(service));
447        self
448    }
449
450    /// Register a component with this context.
451    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
452        info!(scheme = component.scheme(), "Registering component");
453        self.registry
454            .lock()
455            .expect("mutex poisoned: another thread panicked while holding this lock")
456            .register(component);
457    }
458
459    /// Register a language with this context, keyed by name.
460    ///
461    /// Returns `Err(LanguageError::AlreadyRegistered)` if a language with the
462    /// same name is already registered. Use [`resolve_language`](Self::resolve_language)
463    /// to check before registering, or choose a distinct name.
464    pub fn register_language(
465        &mut self,
466        name: impl Into<String>,
467        lang: Box<dyn Language>,
468    ) -> Result<(), LanguageError> {
469        let name = name.into();
470        let mut languages = self
471            .languages
472            .lock()
473            .expect("mutex poisoned: another thread panicked while holding this lock");
474        if languages.contains_key(&name) {
475            return Err(LanguageError::AlreadyRegistered(name));
476        }
477        languages.insert(name, Arc::from(lang));
478        Ok(())
479    }
480
481    /// Resolve a language by name. Returns `None` if not registered.
482    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
483        let languages = self
484            .languages
485            .lock()
486            .expect("mutex poisoned: another thread panicked while holding this lock");
487        languages.get(name).cloned()
488    }
489
490    /// Add a route definition to this context.
491    ///
492    /// The route must have an ID. Steps are resolved immediately using registered components.
493    pub async fn add_route_definition(
494        &self,
495        definition: RouteDefinition,
496    ) -> Result<(), CamelError> {
497        use crate::lifecycle::ports::RouteRegistrationPort;
498        info!(
499            from = definition.from_uri(),
500            route_id = %definition.route_id(),
501            "Adding route definition"
502        );
503        self.runtime.register_route(definition).await
504    }
505
506    fn next_context_command_id(op: &str, route_id: &str) -> String {
507        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
508        format!("context:{op}:{route_id}:{seq}")
509    }
510
511    /// Access the component registry.
512    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
513        self.registry
514            .lock()
515            .expect("mutex poisoned: another thread panicked while holding this lock")
516    }
517
518    /// Get runtime execution handle for file-watcher integrations.
519    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
520        RuntimeExecutionHandle {
521            controller: Arc::clone(&self.route_controller),
522            runtime: Arc::clone(&self.runtime),
523        }
524    }
525
526    /// Get the metrics collector.
527    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
528        Arc::clone(&self.metrics)
529    }
530
531    /// Get runtime command/query bus handle.
532    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
533        self.runtime.clone()
534    }
535
536    /// Build a producer context wired to this runtime.
537    pub fn producer_context(&self) -> camel_api::ProducerContext {
538        camel_api::ProducerContext::new().with_runtime(self.runtime())
539    }
540
541    /// Query route status via runtime read-model.
542    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
543        match self
544            .runtime()
545            .ask(camel_api::RuntimeQuery::GetRouteStatus {
546                route_id: route_id.to_string(),
547            })
548            .await
549        {
550            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
551            Ok(_) => Err(CamelError::RouteError(
552                "unexpected runtime query response for route status".to_string(),
553            )),
554            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
555            Err(err) => Err(err),
556        }
557    }
558
559    /// Start all routes. Each route's consumer will begin producing exchanges.
560    ///
561    /// Only routes with `auto_startup == true` will be started, in order of their
562    /// `startup_order` (lower values start first).
563    pub async fn start(&mut self) -> Result<(), CamelError> {
564        info!("Starting CamelContext");
565
566        // Start lifecycle services first
567        for (i, service) in self.services.iter_mut().enumerate() {
568            info!("Starting service: {}", service.name());
569            if let Err(e) = service.start().await {
570                // Rollback: stop already started services in reverse order
571                warn!(
572                    "Service {} failed to start, rolling back {} services",
573                    service.name(),
574                    i
575                );
576                for j in (0..i).rev() {
577                    if let Err(rollback_err) = self.services[j].stop().await {
578                        warn!(
579                            "Failed to stop service {} during rollback: {}",
580                            self.services[j].name(),
581                            rollback_err
582                        );
583                    }
584                }
585                return Err(e);
586            }
587        }
588
589        // Then start routes via runtime command bus (aggregate-first),
590        // preserving route controller startup ordering metadata.
591        let route_ids = {
592            let controller = self.route_controller.lock().await;
593            controller.auto_startup_route_ids()
594        };
595        for route_id in route_ids {
596            self.runtime
597                .execute(camel_api::RuntimeCommand::StartRoute {
598                    route_id: route_id.clone(),
599                    command_id: Self::next_context_command_id("start", &route_id),
600                    causation_id: None,
601                })
602                .await?;
603        }
604
605        info!("CamelContext started");
606        Ok(())
607    }
608
609    /// Graceful shutdown with default 30-second timeout.
610    pub async fn stop(&mut self) -> Result<(), CamelError> {
611        self.stop_timeout(self.shutdown_timeout).await
612    }
613
614    /// Graceful shutdown with custom timeout.
615    ///
616    /// Note: The timeout parameter is currently not used directly; the RouteController
617    /// manages its own shutdown timeout. This may change in a future version.
618    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
619        info!("Stopping CamelContext");
620
621        // Signal cancellation (for any legacy code that might use it)
622        self.cancel_token.cancel();
623
624        // Stop all routes via runtime command bus (aggregate-first),
625        // preserving route controller shutdown ordering metadata.
626        let route_ids = {
627            let controller = self.route_controller.lock().await;
628            controller.shutdown_route_ids()
629        };
630        for route_id in route_ids {
631            if let Err(err) = self
632                .runtime
633                .execute(camel_api::RuntimeCommand::StopRoute {
634                    route_id: route_id.clone(),
635                    command_id: Self::next_context_command_id("stop", &route_id),
636                    causation_id: None,
637                })
638                .await
639            {
640                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
641            }
642        }
643
644        // Then stop lifecycle services in reverse insertion order (LIFO)
645        // Continue stopping all services even if some fail
646        let mut first_error = None;
647        for service in self.services.iter_mut().rev() {
648            info!("Stopping service: {}", service.name());
649            if let Err(e) = service.stop().await {
650                warn!("Service {} failed to stop: {}", service.name(), e);
651                if first_error.is_none() {
652                    first_error = Some(e);
653                }
654            }
655        }
656
657        info!("CamelContext stopped");
658
659        if let Some(e) = first_error {
660            Err(e)
661        } else {
662            Ok(())
663        }
664    }
665
666    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
667    pub fn shutdown_timeout(&self) -> std::time::Duration {
668        self.shutdown_timeout
669    }
670
671    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
672    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
673        self.shutdown_timeout = timeout;
674    }
675
676    /// Immediate abort — kills all tasks without draining.
677    pub async fn abort(&mut self) {
678        self.cancel_token.cancel();
679        let route_ids = {
680            let controller = self.route_controller.lock().await;
681            controller.shutdown_route_ids()
682        };
683        for route_id in route_ids {
684            let _ = self
685                .runtime
686                .execute(camel_api::RuntimeCommand::StopRoute {
687                    route_id: route_id.clone(),
688                    command_id: Self::next_context_command_id("abort-stop", &route_id),
689                    causation_id: None,
690                })
691                .await;
692        }
693    }
694
695    /// Check health status of all registered services.
696    pub fn health_check(&self) -> HealthReport {
697        let services: Vec<ServiceHealth> = self
698            .services
699            .iter()
700            .map(|s| ServiceHealth {
701                name: s.name().to_string(),
702                status: s.status(),
703            })
704            .collect();
705
706        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
707            HealthStatus::Healthy
708        } else {
709            HealthStatus::Unhealthy
710        };
711
712        HealthReport {
713            status,
714            services,
715            ..Default::default()
716        }
717    }
718
719    /// Store a component config. Overwrites any previously stored config of the same type.
720    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
721        self.component_configs
722            .insert(TypeId::of::<T>(), Box::new(config));
723    }
724
725    /// Retrieve a stored component config by type. Returns None if not stored.
726    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
727        self.component_configs
728            .get(&TypeId::of::<T>())
729            .and_then(|b| b.downcast_ref::<T>())
730    }
731}
732
733impl Default for CamelContext {
734    fn default() -> Self {
735        Self::new()
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use super::*;
742    use crate::lifecycle::application::route_definition::{
743        BuilderStep, LanguageExpressionDef, RouteDefinition,
744    };
745    use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState};
746    use async_trait::async_trait;
747    use camel_api::CamelError;
748    use camel_api::{
749        CanonicalRouteSpec, RuntimeCommand, RuntimeCommandResult, RuntimeQuery, RuntimeQueryResult,
750    };
751    use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
752
753    /// Mock component for testing
754    struct MockComponent;
755
756    impl Component for MockComponent {
757        fn scheme(&self) -> &str {
758            "mock"
759        }
760
761        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
762            Err(CamelError::ComponentNotFound("mock".to_string()))
763        }
764    }
765
766    struct HoldConsumer;
767
768    #[async_trait]
769    impl Consumer for HoldConsumer {
770        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
771            ctx.cancelled().await;
772            Ok(())
773        }
774
775        async fn stop(&mut self) -> Result<(), CamelError> {
776            Ok(())
777        }
778
779        fn concurrency_model(&self) -> ConcurrencyModel {
780            ConcurrencyModel::Sequential
781        }
782    }
783
784    struct HoldEndpoint;
785
786    impl Endpoint for HoldEndpoint {
787        fn uri(&self) -> &str {
788            "hold:test"
789        }
790
791        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
792            Ok(Box::new(HoldConsumer))
793        }
794
795        fn create_producer(
796            &self,
797            _ctx: &camel_api::ProducerContext,
798        ) -> Result<camel_api::BoxProcessor, CamelError> {
799            Err(CamelError::RouteError("no producer".to_string()))
800        }
801    }
802
803    struct HoldComponent;
804
805    impl Component for HoldComponent {
806        fn scheme(&self) -> &str {
807            "hold"
808        }
809
810        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
811            Ok(Box::new(HoldEndpoint))
812        }
813    }
814
815    #[test]
816    fn test_context_handles_mutex_poisoning_gracefully() {
817        let mut ctx = CamelContext::new();
818
819        // Register a component successfully
820        ctx.register_component(MockComponent);
821
822        // Access registry should work even after potential panic in another thread
823        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
824            let _guard = ctx.registry();
825        }));
826
827        assert!(
828            result.is_ok(),
829            "Registry access should handle mutex poisoning"
830        );
831    }
832
833    #[test]
834    fn test_context_resolves_simple_language() {
835        let ctx = CamelContext::new();
836        let lang = ctx
837            .resolve_language("simple")
838            .expect("simple language not found");
839        assert_eq!(lang.name(), "simple");
840    }
841
842    #[test]
843    fn test_simple_language_via_context() {
844        let ctx = CamelContext::new();
845        let lang = ctx.resolve_language("simple").unwrap();
846        let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
847        let mut msg = camel_api::message::Message::default();
848        msg.set_header("x", camel_api::Value::String("hello".into()));
849        let ex = camel_api::exchange::Exchange::new(msg);
850        assert!(pred.matches(&ex).unwrap());
851    }
852
853    #[test]
854    fn test_resolve_unknown_language_returns_none() {
855        let ctx = CamelContext::new();
856        assert!(ctx.resolve_language("nonexistent").is_none());
857    }
858
859    #[test]
860    fn test_register_language_duplicate_returns_error() {
861        use camel_language_api::LanguageError;
862        struct DummyLang;
863        impl camel_language_api::Language for DummyLang {
864            fn name(&self) -> &'static str {
865                "dummy"
866            }
867            fn create_expression(
868                &self,
869                _: &str,
870            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
871                Err(LanguageError::EvalError("not implemented".into()))
872            }
873            fn create_predicate(
874                &self,
875                _: &str,
876            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
877                Err(LanguageError::EvalError("not implemented".into()))
878            }
879        }
880
881        let mut ctx = CamelContext::new();
882        ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
883        let result = ctx.register_language("dummy", Box::new(DummyLang));
884        assert!(result.is_err(), "duplicate registration should fail");
885        let err_msg = result.unwrap_err().to_string();
886        assert!(
887            err_msg.contains("dummy"),
888            "error should mention the language name"
889        );
890    }
891
892    #[test]
893    fn test_register_language_new_key_succeeds() {
894        use camel_language_api::LanguageError;
895        struct DummyLang;
896        impl camel_language_api::Language for DummyLang {
897            fn name(&self) -> &'static str {
898                "dummy"
899            }
900            fn create_expression(
901                &self,
902                _: &str,
903            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
904                Err(LanguageError::EvalError("not implemented".into()))
905            }
906            fn create_predicate(
907                &self,
908                _: &str,
909            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
910                Err(LanguageError::EvalError("not implemented".into()))
911            }
912        }
913
914        let mut ctx = CamelContext::new();
915        let result = ctx.register_language("dummy", Box::new(DummyLang));
916        assert!(result.is_ok(), "first registration should succeed");
917    }
918
919    #[tokio::test]
920    async fn test_add_route_definition_uses_runtime_registered_language() {
921        use camel_language_api::{Expression, LanguageError, Predicate};
922
923        struct DummyExpression;
924        impl Expression for DummyExpression {
925            fn evaluate(
926                &self,
927                _exchange: &camel_api::Exchange,
928            ) -> Result<camel_api::Value, LanguageError> {
929                Ok(camel_api::Value::String("ok".into()))
930            }
931        }
932
933        struct DummyPredicate;
934        impl Predicate for DummyPredicate {
935            fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
936                Ok(true)
937            }
938        }
939
940        struct RuntimeLang;
941        impl camel_language_api::Language for RuntimeLang {
942            fn name(&self) -> &'static str {
943                "runtime"
944            }
945
946            fn create_expression(
947                &self,
948                _script: &str,
949            ) -> Result<Box<dyn Expression>, LanguageError> {
950                Ok(Box::new(DummyExpression))
951            }
952
953            fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
954                Ok(Box::new(DummyPredicate))
955            }
956        }
957
958        let mut ctx = CamelContext::new();
959        ctx.register_language("runtime", Box::new(RuntimeLang))
960            .unwrap();
961
962        let definition = RouteDefinition::new(
963            "timer:tick",
964            vec![BuilderStep::DeclarativeScript {
965                expression: LanguageExpressionDef {
966                    language: "runtime".into(),
967                    source: "${body}".into(),
968                },
969            }],
970        )
971        .with_route_id("runtime-lang-route");
972
973        let result = ctx.add_route_definition(definition).await;
974        assert!(
975            result.is_ok(),
976            "route should resolve runtime language: {result:?}"
977        );
978    }
979
980    #[tokio::test]
981    async fn test_add_route_definition_fails_for_unregistered_runtime_language() {
982        let ctx = CamelContext::new();
983        let definition = RouteDefinition::new(
984            "timer:tick",
985            vec![BuilderStep::DeclarativeSetBody {
986                value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
987                    language: "missing-lang".into(),
988                    source: "${body}".into(),
989                }),
990            }],
991        )
992        .with_route_id("missing-runtime-lang-route");
993
994        let result = ctx.add_route_definition(definition).await;
995        assert!(
996            result.is_err(),
997            "route should fail when language is missing"
998        );
999        let error_text = result.unwrap_err().to_string();
1000        assert!(
1001            error_text.contains("missing-lang"),
1002            "error should mention missing language, got: {error_text}"
1003        );
1004    }
1005
1006    #[tokio::test]
1007    async fn add_route_definition_does_not_require_mut() {
1008        let ctx = CamelContext::new();
1009        let definition = RouteDefinition::new("timer:tick", vec![]).with_route_id("immutable-ctx");
1010
1011        let result = ctx.add_route_definition(definition).await;
1012        assert!(
1013            result.is_ok(),
1014            "immutable context should add route: {result:?}"
1015        );
1016    }
1017
1018    #[test]
1019    fn test_health_check_empty_context() {
1020        let ctx = CamelContext::new();
1021        let report = ctx.health_check();
1022
1023        assert_eq!(report.status, HealthStatus::Healthy);
1024        assert!(report.services.is_empty());
1025    }
1026
1027    #[tokio::test]
1028    async fn context_exposes_runtime_command_and_query_buses() {
1029        let ctx = CamelContext::new();
1030        let runtime = ctx.runtime();
1031
1032        let register = runtime
1033            .execute(RuntimeCommand::RegisterRoute {
1034                spec: CanonicalRouteSpec::new("runtime-r1", "timer:tick"),
1035                command_id: "cmd-1".into(),
1036                causation_id: None,
1037            })
1038            .await
1039            .unwrap();
1040        assert!(matches!(
1041            register,
1042            RuntimeCommandResult::RouteRegistered { ref route_id } if route_id == "runtime-r1"
1043        ));
1044
1045        let query = runtime
1046            .ask(RuntimeQuery::GetRouteStatus {
1047                route_id: "runtime-r1".into(),
1048            })
1049            .await
1050            .unwrap();
1051        assert!(matches!(
1052            query,
1053            RuntimeQueryResult::RouteStatus { ref status, .. } if status == "Registered"
1054        ));
1055    }
1056
1057    #[tokio::test]
1058    async fn default_runtime_journal_isolated_per_context_without_env_override() {
1059        if let Ok(value) = std::env::var("CAMEL_RUNTIME_JOURNAL_PATH")
1060            && !value.trim().is_empty()
1061        {
1062            return;
1063        }
1064
1065        let first = CamelContext::new();
1066        first
1067            .runtime()
1068            .execute(RuntimeCommand::RegisterRoute {
1069                spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1070                command_id: "iso-c1".into(),
1071                causation_id: None,
1072            })
1073            .await
1074            .unwrap();
1075
1076        let second = CamelContext::new();
1077        second
1078            .runtime()
1079            .execute(RuntimeCommand::RegisterRoute {
1080                spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1081                command_id: "iso-c2".into(),
1082                causation_id: None,
1083            })
1084            .await
1085            .unwrap();
1086    }
1087
1088    #[tokio::test]
1089    async fn runtime_commands_drive_real_route_controller_lifecycle() {
1090        let mut ctx = CamelContext::new();
1091        ctx.register_component(HoldComponent);
1092        let runtime = ctx.runtime();
1093
1094        runtime
1095            .execute(RuntimeCommand::RegisterRoute {
1096                spec: CanonicalRouteSpec::new("runtime-hold", "hold:test"),
1097                command_id: "c1".into(),
1098                causation_id: None,
1099            })
1100            .await
1101            .unwrap();
1102
1103        assert_eq!(
1104            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1105            Some("Registered".to_string())
1106        );
1107        assert!(matches!(
1108            ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1109            Some(agg)
1110                if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Registered)
1111        ));
1112
1113        runtime
1114            .execute(RuntimeCommand::StartRoute {
1115                route_id: "runtime-hold".into(),
1116                command_id: "c2".into(),
1117                causation_id: Some("c1".into()),
1118            })
1119            .await
1120            .unwrap();
1121        assert_eq!(
1122            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1123            Some("Started".to_string())
1124        );
1125        assert!(matches!(
1126            ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1127            Some(agg)
1128                if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Started)
1129        ));
1130
1131        runtime
1132            .execute(RuntimeCommand::SuspendRoute {
1133                route_id: "runtime-hold".into(),
1134                command_id: "c3".into(),
1135                causation_id: Some("c2".into()),
1136            })
1137            .await
1138            .unwrap();
1139        assert_eq!(
1140            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1141            Some("Suspended".to_string())
1142        );
1143
1144        runtime
1145            .execute(RuntimeCommand::ResumeRoute {
1146                route_id: "runtime-hold".into(),
1147                command_id: "c4".into(),
1148                causation_id: Some("c3".into()),
1149            })
1150            .await
1151            .unwrap();
1152        assert_eq!(
1153            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1154            Some("Started".to_string())
1155        );
1156
1157        runtime
1158            .execute(RuntimeCommand::StopRoute {
1159                route_id: "runtime-hold".into(),
1160                command_id: "c5".into(),
1161                causation_id: Some("c4".into()),
1162            })
1163            .await
1164            .unwrap();
1165        assert_eq!(
1166            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1167            Some("Stopped".to_string())
1168        );
1169
1170        runtime
1171            .execute(RuntimeCommand::ReloadRoute {
1172                route_id: "runtime-hold".into(),
1173                command_id: "c6".into(),
1174                causation_id: Some("c5".into()),
1175            })
1176            .await
1177            .unwrap();
1178        assert_eq!(
1179            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1180            Some("Started".to_string())
1181        );
1182
1183        runtime
1184            .execute(RuntimeCommand::StopRoute {
1185                route_id: "runtime-hold".into(),
1186                command_id: "c7".into(),
1187                causation_id: Some("c6".into()),
1188            })
1189            .await
1190            .unwrap();
1191        assert_eq!(
1192            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1193            Some("Stopped".to_string())
1194        );
1195
1196        runtime
1197            .execute(RuntimeCommand::RemoveRoute {
1198                route_id: "runtime-hold".into(),
1199                command_id: "c8".into(),
1200                causation_id: Some("c7".into()),
1201            })
1202            .await
1203            .unwrap();
1204        assert_eq!(
1205            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1206            None
1207        );
1208        assert!(
1209            ctx.runtime
1210                .repo()
1211                .load("runtime-hold")
1212                .await
1213                .unwrap()
1214                .is_none()
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn runtime_queries_read_projection_state_when_connected() {
1220        let mut ctx = CamelContext::new();
1221        ctx.register_component(HoldComponent);
1222        let runtime = ctx.runtime();
1223        runtime
1224            .execute(RuntimeCommand::RegisterRoute {
1225                spec: CanonicalRouteSpec::new("rq", "hold:test"),
1226                command_id: "c1".into(),
1227                causation_id: None,
1228            })
1229            .await
1230            .unwrap();
1231
1232        // Diverge live controller state from the projection on purpose.
1233        ctx.runtime_execution_handle()
1234            .force_start_route_for_test("rq")
1235            .await
1236            .unwrap();
1237
1238        let result = runtime
1239            .ask(RuntimeQuery::GetRouteStatus {
1240                route_id: "rq".into(),
1241            })
1242            .await
1243            .unwrap();
1244
1245        match result {
1246            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1247            _ => panic!("unexpected query result"),
1248        }
1249    }
1250
1251    #[tokio::test]
1252    async fn add_route_definition_produces_registered_state() {
1253        let ctx = CamelContext::new();
1254        let definition =
1255            RouteDefinition::new("direct:test", vec![]).with_route_id("async-test-route");
1256
1257        ctx.add_route_definition(definition).await.unwrap();
1258
1259        let status = ctx
1260            .runtime()
1261            .ask(RuntimeQuery::GetRouteStatus {
1262                route_id: "async-test-route".to_string(),
1263            })
1264            .await
1265            .unwrap();
1266
1267        match status {
1268            RuntimeQueryResult::RouteStatus { status, .. } => {
1269                assert_eq!(
1270                    status, "Registered",
1271                    "expected Registered state after add_route_definition"
1272                );
1273            }
1274            _ => panic!("unexpected query result"),
1275        }
1276    }
1277
1278    #[tokio::test]
1279    async fn add_route_definition_injects_runtime_into_producer_context() {
1280        use std::sync::atomic::{AtomicBool, Ordering};
1281
1282        struct RuntimeAwareEndpoint {
1283            saw_runtime: Arc<AtomicBool>,
1284        }
1285
1286        impl Endpoint for RuntimeAwareEndpoint {
1287            fn uri(&self) -> &str {
1288                "runtime-aware:test"
1289            }
1290
1291            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1292                Err(CamelError::RouteError("no consumer".to_string()))
1293            }
1294
1295            fn create_producer(
1296                &self,
1297                ctx: &camel_api::ProducerContext,
1298            ) -> Result<camel_api::BoxProcessor, CamelError> {
1299                self.saw_runtime
1300                    .store(ctx.runtime().is_some(), Ordering::SeqCst);
1301                if ctx.runtime().is_none() {
1302                    return Err(CamelError::RouteError(
1303                        "runtime handle missing in ProducerContext".to_string(),
1304                    ));
1305                }
1306                Ok(camel_api::BoxProcessor::new(camel_api::IdentityProcessor))
1307            }
1308        }
1309
1310        struct RuntimeAwareComponent {
1311            saw_runtime: Arc<AtomicBool>,
1312        }
1313
1314        impl Component for RuntimeAwareComponent {
1315            fn scheme(&self) -> &str {
1316                "runtime-aware"
1317            }
1318
1319            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1320                Ok(Box::new(RuntimeAwareEndpoint {
1321                    saw_runtime: Arc::clone(&self.saw_runtime),
1322                }))
1323            }
1324        }
1325
1326        let saw_runtime = Arc::new(AtomicBool::new(false));
1327        let mut ctx = CamelContext::new();
1328        ctx.register_component(RuntimeAwareComponent {
1329            saw_runtime: Arc::clone(&saw_runtime),
1330        });
1331
1332        let definition = RouteDefinition::new(
1333            "timer:tick",
1334            vec![BuilderStep::To("runtime-aware:test".to_string())],
1335        )
1336        .with_route_id("runtime-aware-route");
1337
1338        let result = ctx.add_route_definition(definition).await;
1339        assert!(
1340            result.is_ok(),
1341            "route should resolve producer with runtime context: {result:?}"
1342        );
1343        assert!(
1344            saw_runtime.load(Ordering::SeqCst),
1345            "component producer should observe runtime handle in ProducerContext"
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn add_route_definition_registers_runtime_projection_and_aggregate() {
1351        let mut ctx = CamelContext::new();
1352        ctx.register_component(HoldComponent);
1353
1354        let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-r1");
1355        ctx.add_route_definition(definition).await.unwrap();
1356
1357        let aggregate = ctx.runtime.repo().load("ctx-runtime-r1").await.unwrap();
1358        assert!(
1359            matches!(aggregate, Some(agg) if matches!(agg.state(), RouteRuntimeState::Registered)),
1360            "route registration should seed aggregate as Registered"
1361        );
1362
1363        let status = ctx.runtime_route_status("ctx-runtime-r1").await.unwrap();
1364        assert_eq!(status.as_deref(), Some("Registered"));
1365    }
1366
1367    #[tokio::test]
1368    async fn add_route_definition_rolls_back_controller_when_runtime_registration_fails() {
1369        let mut ctx = CamelContext::new();
1370        ctx.register_component(HoldComponent);
1371
1372        ctx.runtime
1373            .repo()
1374            .save(RouteRuntimeAggregate::new("ctx-runtime-dup"))
1375            .await
1376            .unwrap();
1377
1378        let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-dup");
1379        let result = ctx.add_route_definition(definition).await;
1380        assert!(result.is_err(), "duplicate runtime registration must fail");
1381
1382        assert_eq!(
1383            ctx.runtime_execution_handle()
1384                .controller_route_count_for_test()
1385                .await,
1386            0,
1387            "controller route should be rolled back on runtime bootstrap failure"
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn context_start_stop_drives_runtime_lifecycle_via_command_bus() {
1393        let mut ctx = CamelContext::new();
1394        ctx.register_component(HoldComponent);
1395
1396        let autostart =
1397            RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-lifecycle-auto");
1398        let lazy = RouteDefinition::new("hold:test", vec![])
1399            .with_route_id("ctx-lifecycle-lazy")
1400            .with_auto_startup(false);
1401
1402        ctx.add_route_definition(autostart).await.unwrap();
1403        ctx.add_route_definition(lazy).await.unwrap();
1404
1405        assert_eq!(
1406            ctx.runtime_route_status("ctx-lifecycle-auto")
1407                .await
1408                .unwrap(),
1409            Some("Registered".to_string())
1410        );
1411        assert_eq!(
1412            ctx.runtime_route_status("ctx-lifecycle-lazy")
1413                .await
1414                .unwrap(),
1415            Some("Registered".to_string())
1416        );
1417
1418        ctx.start().await.unwrap();
1419
1420        assert_eq!(
1421            ctx.runtime_route_status("ctx-lifecycle-auto")
1422                .await
1423                .unwrap(),
1424            Some("Started".to_string())
1425        );
1426        assert_eq!(
1427            ctx.runtime_route_status("ctx-lifecycle-lazy")
1428                .await
1429                .unwrap(),
1430            Some("Registered".to_string())
1431        );
1432
1433        ctx.stop().await.unwrap();
1434
1435        assert_eq!(
1436            ctx.runtime_route_status("ctx-lifecycle-auto")
1437                .await
1438                .unwrap(),
1439            Some("Stopped".to_string())
1440        );
1441        assert_eq!(
1442            ctx.runtime_route_status("ctx-lifecycle-lazy")
1443                .await
1444                .unwrap(),
1445            Some("Registered".to_string())
1446        );
1447    }
1448}
1449
1450#[cfg(test)]
1451mod lifecycle_tests {
1452    use super::*;
1453    use async_trait::async_trait;
1454    use camel_api::Lifecycle;
1455    use std::sync::Arc;
1456    use std::sync::atomic::{AtomicUsize, Ordering};
1457
1458    struct MockService {
1459        start_count: Arc<AtomicUsize>,
1460        stop_count: Arc<AtomicUsize>,
1461    }
1462
1463    impl MockService {
1464        fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
1465            let start_count = Arc::new(AtomicUsize::new(0));
1466            let stop_count = Arc::new(AtomicUsize::new(0));
1467            (
1468                Self {
1469                    start_count: start_count.clone(),
1470                    stop_count: stop_count.clone(),
1471                },
1472                start_count,
1473                stop_count,
1474            )
1475        }
1476    }
1477
1478    #[async_trait]
1479    impl Lifecycle for MockService {
1480        fn name(&self) -> &str {
1481            "mock"
1482        }
1483
1484        async fn start(&mut self) -> Result<(), CamelError> {
1485            self.start_count.fetch_add(1, Ordering::SeqCst);
1486            Ok(())
1487        }
1488
1489        async fn stop(&mut self) -> Result<(), CamelError> {
1490            self.stop_count.fetch_add(1, Ordering::SeqCst);
1491            Ok(())
1492        }
1493    }
1494
1495    #[tokio::test]
1496    async fn test_context_starts_lifecycle_services() {
1497        let (service, start_count, stop_count) = MockService::new();
1498
1499        let mut ctx = CamelContext::new().with_lifecycle(service);
1500
1501        assert_eq!(start_count.load(Ordering::SeqCst), 0);
1502
1503        ctx.start().await.unwrap();
1504
1505        assert_eq!(start_count.load(Ordering::SeqCst), 1);
1506        assert_eq!(stop_count.load(Ordering::SeqCst), 0);
1507
1508        ctx.stop().await.unwrap();
1509
1510        assert_eq!(stop_count.load(Ordering::SeqCst), 1);
1511    }
1512
1513    #[tokio::test]
1514    async fn test_service_start_failure_rollback() {
1515        struct FailingService {
1516            start_count: Arc<AtomicUsize>,
1517            stop_count: Arc<AtomicUsize>,
1518            should_fail: bool,
1519        }
1520
1521        #[async_trait]
1522        impl Lifecycle for FailingService {
1523            fn name(&self) -> &str {
1524                "failing"
1525            }
1526
1527            async fn start(&mut self) -> Result<(), CamelError> {
1528                self.start_count.fetch_add(1, Ordering::SeqCst);
1529                if self.should_fail {
1530                    Err(CamelError::ProcessorError("intentional failure".into()))
1531                } else {
1532                    Ok(())
1533                }
1534            }
1535
1536            async fn stop(&mut self) -> Result<(), CamelError> {
1537                self.stop_count.fetch_add(1, Ordering::SeqCst);
1538                Ok(())
1539            }
1540        }
1541
1542        let start1 = Arc::new(AtomicUsize::new(0));
1543        let stop1 = Arc::new(AtomicUsize::new(0));
1544        let start2 = Arc::new(AtomicUsize::new(0));
1545        let stop2 = Arc::new(AtomicUsize::new(0));
1546        let start3 = Arc::new(AtomicUsize::new(0));
1547        let stop3 = Arc::new(AtomicUsize::new(0));
1548
1549        let service1 = FailingService {
1550            start_count: start1.clone(),
1551            stop_count: stop1.clone(),
1552            should_fail: false,
1553        };
1554        let service2 = FailingService {
1555            start_count: start2.clone(),
1556            stop_count: stop2.clone(),
1557            should_fail: true, // This one will fail
1558        };
1559        let service3 = FailingService {
1560            start_count: start3.clone(),
1561            stop_count: stop3.clone(),
1562            should_fail: false,
1563        };
1564
1565        let mut ctx = CamelContext::new()
1566            .with_lifecycle(service1)
1567            .with_lifecycle(service2)
1568            .with_lifecycle(service3);
1569
1570        // Attempt to start - should fail
1571        let result = ctx.start().await;
1572        assert!(result.is_err());
1573
1574        // Verify service1 was started and then stopped (rollback)
1575        assert_eq!(start1.load(Ordering::SeqCst), 1);
1576        assert_eq!(stop1.load(Ordering::SeqCst), 1);
1577
1578        // Verify service2 was attempted to start but failed
1579        assert_eq!(start2.load(Ordering::SeqCst), 1);
1580        assert_eq!(stop2.load(Ordering::SeqCst), 0);
1581
1582        // Verify service3 was never started
1583        assert_eq!(start3.load(Ordering::SeqCst), 0);
1584        assert_eq!(stop3.load(Ordering::SeqCst), 0);
1585    }
1586
1587    #[tokio::test]
1588    async fn test_services_stop_in_reverse_order() {
1589        use std::sync::Mutex as StdMutex;
1590
1591        struct OrderTracker {
1592            name: String,
1593            order: Arc<StdMutex<Vec<String>>>,
1594        }
1595
1596        #[async_trait]
1597        impl Lifecycle for OrderTracker {
1598            fn name(&self) -> &str {
1599                &self.name
1600            }
1601
1602            async fn start(&mut self) -> Result<(), CamelError> {
1603                Ok(())
1604            }
1605
1606            async fn stop(&mut self) -> Result<(), CamelError> {
1607                self.order.lock().unwrap().push(self.name.clone());
1608                Ok(())
1609            }
1610        }
1611
1612        let order = Arc::new(StdMutex::new(Vec::<String>::new()));
1613
1614        let s1 = OrderTracker {
1615            name: "first".into(),
1616            order: Arc::clone(&order),
1617        };
1618        let s2 = OrderTracker {
1619            name: "second".into(),
1620            order: Arc::clone(&order),
1621        };
1622        let s3 = OrderTracker {
1623            name: "third".into(),
1624            order: Arc::clone(&order),
1625        };
1626
1627        let mut ctx = CamelContext::new()
1628            .with_lifecycle(s1)
1629            .with_lifecycle(s2)
1630            .with_lifecycle(s3);
1631
1632        ctx.start().await.unwrap();
1633        ctx.stop().await.unwrap();
1634
1635        let stopped = order.lock().unwrap();
1636        assert_eq!(
1637            *stopped,
1638            vec!["third", "second", "first"],
1639            "services must stop in reverse insertion order"
1640        );
1641    }
1642}
1643
1644#[cfg(test)]
1645mod config_registry_tests {
1646    use super::*;
1647
1648    #[derive(Debug, Clone, PartialEq)]
1649    struct MyConfig {
1650        value: u32,
1651    }
1652
1653    #[test]
1654    fn test_set_and_get_component_config() {
1655        let mut ctx = CamelContext::new();
1656        ctx.set_component_config(MyConfig { value: 42 });
1657        let got = ctx.get_component_config::<MyConfig>();
1658        assert_eq!(got, Some(&MyConfig { value: 42 }));
1659    }
1660
1661    #[test]
1662    fn test_get_missing_config_returns_none() {
1663        let ctx = CamelContext::new();
1664        assert!(ctx.get_component_config::<MyConfig>().is_none());
1665    }
1666
1667    #[test]
1668    fn test_set_overwrites_previous_config() {
1669        let mut ctx = CamelContext::new();
1670        ctx.set_component_config(MyConfig { value: 1 });
1671        ctx.set_component_config(MyConfig { value: 2 });
1672        assert_eq!(ctx.get_component_config::<MyConfig>().unwrap().value, 2);
1673    }
1674}