Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, PlatformIdentity,
12    PlatformService, ReadinessGate, RouteTemplateSpec, RuntimeCommandBus, RuntimeQueryBus,
13    TemplateInstanceRecord,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::health_registry::HealthCheckRegistry;
19use crate::lifecycle::adapters::controller_actor::RouteControllerHandle;
20use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
21use crate::lifecycle::application::route_definition::RouteDefinition;
22use crate::lifecycle::application::runtime_bus::RuntimeBus;
23use crate::lifecycle::domain::LanguageRegistryError;
24use crate::shared::components::domain::Registry;
25use crate::shared::observability::domain::TracerConfig;
26use crate::template::TemplateRegistry;
27
28static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
29
30pub use crate::context_builder::CamelContextBuilder;
31
32/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
33///
34/// # Lifecycle
35///
36/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
37/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
38/// stopped context is not supported — create a new instance instead.
39pub struct CamelContext {
40    registry: Arc<std::sync::Mutex<Registry>>,
41    route_controller: RouteControllerHandle,
42    _actor_join: tokio::task::JoinHandle<()>,
43    supervision_join: Option<tokio::task::JoinHandle<()>>,
44    runtime: Arc<RuntimeBus>,
45    cancel_token: CancellationToken,
46    metrics: Arc<dyn MetricsCollector>,
47    // Platform ports
48    platform_service: Arc<dyn PlatformService>,
49    languages: SharedLanguageRegistry,
50    shutdown_timeout: std::time::Duration,
51    services: Vec<Box<dyn Lifecycle>>,
52    health_registry: Arc<HealthCheckRegistry>,
53    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
54    function_invoker: Option<Arc<dyn FunctionInvoker>>,
55    template_registry: Arc<TemplateRegistry>,
56}
57
58/// Parts bag used by [`CamelContextBuilder::build`] to construct a [`CamelContext`]
59/// without accessing private fields from a sibling module.
60pub(crate) struct FromParts {
61    pub(crate) registry: Arc<std::sync::Mutex<Registry>>,
62    pub(crate) route_controller: RouteControllerHandle,
63    pub(crate) _actor_join: tokio::task::JoinHandle<()>,
64    pub(crate) supervision_join: Option<tokio::task::JoinHandle<()>>,
65    pub(crate) runtime: Arc<RuntimeBus>,
66    pub(crate) cancel_token: CancellationToken,
67    pub(crate) metrics: Arc<dyn MetricsCollector>,
68    pub(crate) platform_service: Arc<dyn PlatformService>,
69    pub(crate) languages: SharedLanguageRegistry,
70    pub(crate) shutdown_timeout: std::time::Duration,
71    pub(crate) services: Vec<Box<dyn Lifecycle>>,
72    pub(crate) health_registry: Arc<HealthCheckRegistry>,
73    pub(crate) component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
74    pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
75    pub(crate) template_registry: Arc<TemplateRegistry>,
76}
77
78impl CamelContext {
79    pub(crate) fn from_parts(parts: FromParts) -> Self {
80        Self {
81            registry: parts.registry,
82            route_controller: parts.route_controller,
83            _actor_join: parts._actor_join,
84            supervision_join: parts.supervision_join,
85            runtime: parts.runtime,
86            cancel_token: parts.cancel_token,
87            metrics: parts.metrics,
88            platform_service: parts.platform_service,
89            languages: parts.languages,
90            shutdown_timeout: parts.shutdown_timeout,
91            services: parts.services,
92            health_registry: parts.health_registry,
93            component_configs: parts.component_configs,
94            function_invoker: parts.function_invoker,
95            template_registry: parts.template_registry,
96        }
97    }
98}
99
100/// Opaque handle for runtime side-effect execution operations.
101///
102/// This intentionally does not expose direct lifecycle mutation APIs to callers.
103#[derive(Clone)]
104pub struct RuntimeExecutionHandle {
105    controller: RouteControllerHandle,
106    runtime: Arc<RuntimeBus>,
107    function_invoker: Option<Arc<dyn FunctionInvoker>>,
108}
109
110impl RuntimeExecutionHandle {
111    pub(crate) async fn add_route_definition(
112        &self,
113        definition: RouteDefinition,
114    ) -> Result<(), CamelError> {
115        use crate::lifecycle::ports::RouteRegistrationPort;
116        self.runtime
117            .register_route(definition)
118            .await
119            .map_err(Into::into)
120    }
121
122    pub(crate) async fn compile_route_definition(
123        &self,
124        definition: RouteDefinition,
125    ) -> Result<camel_api::BoxProcessor, CamelError> {
126        self.controller.compile_route_definition(definition).await
127    }
128
129    pub(crate) async fn compile_route_definition_with_generation(
130        &self,
131        definition: RouteDefinition,
132        generation: u64,
133    ) -> Result<camel_api::BoxProcessor, CamelError> {
134        self.controller
135            .compile_route_definition_with_generation(definition, generation)
136            .await
137    }
138
139    pub(crate) async fn prepare_route_definition_with_generation(
140        &self,
141        definition: RouteDefinition,
142        generation: u64,
143    ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
144        self.controller
145            .prepare_route_definition_with_generation(definition, generation)
146            .await
147    }
148
149    pub(crate) async fn insert_prepared_route(
150        &self,
151        prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
152    ) -> Result<(), CamelError> {
153        self.controller.insert_prepared_route(prepared).await
154    }
155
156    pub(crate) async fn remove_route_preserving_functions(
157        &self,
158        route_id: String,
159    ) -> Result<(), CamelError> {
160        self.controller
161            .remove_route_preserving_functions(route_id)
162            .await
163    }
164
165    pub(crate) async fn register_route_aggregate(
166        &self,
167        route_id: String,
168    ) -> Result<(), CamelError> {
169        self.runtime.register_aggregate_only(route_id).await
170    }
171
172    pub(crate) async fn swap_route_pipeline(
173        &self,
174        route_id: &str,
175        pipeline: camel_api::BoxProcessor,
176    ) -> Result<(), CamelError> {
177        self.controller.swap_pipeline(route_id, pipeline).await
178    }
179
180    pub(crate) async fn execute_runtime_command(
181        &self,
182        cmd: camel_api::RuntimeCommand,
183    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
184        self.runtime.execute(cmd).await
185    }
186
187    pub(crate) async fn runtime_route_status(
188        &self,
189        route_id: &str,
190    ) -> Result<Option<String>, CamelError> {
191        match self
192            .runtime
193            .ask(camel_api::RuntimeQuery::GetRouteStatus {
194                route_id: route_id.to_string(),
195            })
196            .await
197        {
198            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
199            Ok(_) => Err(CamelError::RouteError(
200                "unexpected runtime query response for route status".to_string(),
201            )),
202            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
203            Err(err) => Err(err),
204        }
205    }
206
207    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
208        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
209            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
210            Ok(_) => Err(CamelError::RouteError(
211                "unexpected runtime query response for route listing".to_string(),
212            )),
213            Err(err) => Err(err),
214        }
215    }
216
217    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
218        self.controller.route_source_hash(route_id).await
219    }
220
221    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
222        if !self.controller.route_exists(route_id).await? {
223            return Err(CamelError::RouteError(format!(
224                "Route '{}' not found",
225                route_id
226            )));
227        }
228        Ok(self
229            .controller
230            .in_flight_count(route_id)
231            .await?
232            .unwrap_or(0))
233    }
234
235    pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
236        self.function_invoker.clone()
237    }
238
239    #[cfg(test)]
240    pub(crate) async fn force_start_route_for_test(
241        &self,
242        route_id: &str,
243    ) -> Result<(), CamelError> {
244        self.controller.start_route(route_id).await
245    }
246
247    pub async fn controller_route_count_for_test(&self) -> usize {
248        self.controller.route_count().await.unwrap_or(0)
249    }
250}
251
252impl CamelContext {
253    pub fn builder() -> CamelContextBuilder {
254        CamelContextBuilder::new()
255    }
256
257    /// Set a global error handler applied to all routes without a per-route handler.
258    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
259        let _ = self.route_controller.set_error_handler(config).await;
260    }
261
262    /// Enable or disable tracing globally.
263    pub async fn set_tracing(&mut self, enabled: bool) {
264        let _ = self
265            .route_controller
266            .set_tracer_config(TracerConfig {
267                enabled,
268                ..Default::default()
269            })
270            .await;
271    }
272
273    /// Configure tracing with full config.
274    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
275        // Inject metrics collector if not already set
276        let config = if config.metrics_collector.is_none() {
277            TracerConfig {
278                metrics_collector: Some(Arc::clone(&self.metrics)),
279                ..config
280            }
281        } else {
282            config
283        };
284
285        let _ = self.route_controller.set_tracer_config(config).await;
286    }
287
288    /// Builder-style: enable tracing with default config.
289    pub async fn with_tracing(mut self) -> Self {
290        self.set_tracing(true).await;
291        self
292    }
293
294    /// Builder-style: configure tracing with custom config.
295    /// Note: tracing subscriber initialization (stdout/file output) is handled
296    /// separately via init_tracing_subscriber (called in camel-config bridge).
297    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
298        self.set_tracer_config(config).await;
299        self
300    }
301
302    /// Register a lifecycle service (Apache Camel: addService pattern)
303    ///
304    /// For services exposing `as_function_invoker()`, the invoker is propagated
305    /// to the route controller so that subsequent route definitions with function
306    /// steps work correctly.
307    ///
308    /// Prefer [`CamelContextBuilder::with_lifecycle`] when possible, which wires
309    /// the invoker at build time before any routes are added.
310    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
311        if let Some(collector) = service.as_metrics_collector() {
312            self.metrics = collector;
313        }
314        if let Some(invoker) = service.as_function_invoker() {
315            self.function_invoker = Some(invoker.clone());
316            if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
317                tracing::warn!("Failed to propagate function invoker to route controller: {e}");
318            }
319        }
320
321        self.services.push(Box::new(service));
322        self
323    }
324
325    /// Register a component with this context.
326    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
327        info!(scheme = component.scheme(), "Registering component");
328        self.registry
329            .lock()
330            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
331            .register(Arc::new(component));
332    }
333
334    /// Register a language with this context, keyed by name.
335    ///
336    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
337    /// with the same name is already registered. Use
338    /// [`resolve_language`](Self::resolve_language) to check before
339    /// registering, or choose a distinct name.
340    pub fn register_language(
341        &mut self,
342        name: impl Into<String>,
343        lang: Box<dyn Language>,
344    ) -> Result<(), LanguageRegistryError> {
345        let name = name.into();
346        let mut languages = self
347            .languages
348            .lock()
349            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
350        if languages.contains_key(&name) {
351            return Err(LanguageRegistryError::AlreadyRegistered { name });
352        }
353        languages.insert(name, Arc::from(lang));
354        Ok(())
355    }
356
357    /// Resolve a language by name. Returns `None` if not registered.
358    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
359        let languages = self
360            .languages
361            .lock()
362            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
363        languages.get(name).cloned()
364    }
365
366    /// Add a route definition to this context.
367    ///
368    /// The route must have an ID. Steps are resolved immediately using registered components.
369    pub async fn add_route_definition(
370        &self,
371        definition: RouteDefinition,
372    ) -> Result<(), CamelError> {
373        use crate::lifecycle::ports::RouteRegistrationPort;
374        info!(
375            from = definition.from_uri(),
376            route_id = %definition.route_id(),
377            "Adding route definition"
378        );
379        self.runtime
380            .register_route(definition)
381            .await
382            .map_err(Into::into)
383    }
384
385    fn next_context_command_id(op: &str, route_id: &str) -> String {
386        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
387        format!("context:{op}:{route_id}:{seq}")
388    }
389
390    /// Access the component registry.
391    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
392        self.registry
393            .lock()
394            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
395    }
396
397    /// Access the shared component registry Arc.
398    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
399        Arc::clone(&self.registry)
400    }
401
402    /// Get runtime execution handle for file-watcher integrations.
403    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
404        RuntimeExecutionHandle {
405            controller: self.route_controller.clone(),
406            runtime: Arc::clone(&self.runtime),
407            function_invoker: self.function_invoker.clone(),
408        }
409    }
410
411    /// Get the metrics collector.
412    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
413        Arc::clone(&self.metrics)
414    }
415
416    /// Get the platform service.
417    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
418        Arc::clone(&self.platform_service)
419    }
420
421    /// Get the readiness gate port.
422    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
423        self.platform_service.readiness_gate()
424    }
425
426    /// Get the platform identity.
427    pub fn platform_identity(&self) -> PlatformIdentity {
428        self.platform_service.identity()
429    }
430
431    /// Get the leadership service port.
432    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
433        self.platform_service.leadership()
434    }
435
436    /// Get runtime command/query bus handle.
437    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
438        self.runtime.clone()
439    }
440
441    /// Build a producer context wired to this runtime.
442    pub fn producer_context(&self) -> camel_api::ProducerContext {
443        camel_api::ProducerContext::new().with_runtime(self.runtime())
444    }
445
446    /// Query route status via runtime read-model.
447    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
448        match self
449            .runtime()
450            .ask(camel_api::RuntimeQuery::GetRouteStatus {
451                route_id: route_id.to_string(),
452            })
453            .await
454        {
455            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
456            Ok(_) => Err(CamelError::RouteError(
457                "unexpected runtime query response for route status".to_string(),
458            )),
459            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
460            Err(err) => Err(err),
461        }
462    }
463
464    /// Start all routes. Each route's consumer will begin producing exchanges.
465    ///
466    /// Only routes with `auto_startup == true` will be started, in order of their
467    /// `startup_order` (lower values start first).
468    pub async fn start(&mut self) -> Result<(), CamelError> {
469        info!("Starting CamelContext");
470
471        // Start lifecycle services first
472        for (i, service) in self.services.iter_mut().enumerate() {
473            info!("Starting service: {}", service.name());
474            if let Err(e) = service.start().await {
475                // Rollback: stop already started services in reverse order
476                warn!(
477                    "Service {} failed to start, rolling back {} services",
478                    service.name(),
479                    i
480                );
481                for j in (0..i).rev() {
482                    if let Err(rollback_err) = self.services[j].stop().await {
483                        warn!(
484                            "Failed to stop service {} during rollback: {}",
485                            self.services[j].name(),
486                            rollback_err
487                        );
488                    }
489                }
490                return Err(e);
491            }
492        }
493
494        // Then start routes via runtime command bus (aggregate-first),
495        // preserving route controller startup ordering metadata.
496        let route_ids = self.route_controller.auto_startup_route_ids().await?;
497        for route_id in route_ids {
498            self.runtime
499                .execute(camel_api::RuntimeCommand::StartRoute {
500                    route_id: route_id.clone(),
501                    command_id: Self::next_context_command_id("start", &route_id),
502                    causation_id: None,
503                })
504                .await?;
505        }
506
507        info!("CamelContext started");
508        Ok(())
509    }
510
511    /// Graceful shutdown with default 30-second timeout.
512    pub async fn stop(&mut self) -> Result<(), CamelError> {
513        self.stop_timeout(self.shutdown_timeout).await
514    }
515
516    /// Graceful shutdown with custom timeout.
517    ///
518    /// Note: The timeout parameter is currently not propagated to the
519    /// RouteController's per-route shutdown timeout. The RouteController
520    /// uses a hardcoded 5-second default (`DEFAULT_SHUTDOWN_TIMEOUT`).
521    /// Full propagation is planned for a future version.
522    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
523        info!("Stopping CamelContext");
524
525        // Signal cancellation (for any legacy code that might use it)
526        self.cancel_token.cancel();
527        if let Some(join) = self.supervision_join.take() {
528            join.abort();
529        }
530
531        // Stop all routes via runtime command bus (aggregate-first),
532        // preserving route controller shutdown ordering metadata.
533        let route_ids = self.route_controller.shutdown_route_ids().await?;
534        for route_id in route_ids {
535            if let Err(err) = self
536                .runtime
537                .execute(camel_api::RuntimeCommand::StopRoute {
538                    route_id: route_id.clone(),
539                    command_id: Self::next_context_command_id("stop", &route_id),
540                    causation_id: None,
541                })
542                .await
543            {
544                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
545            }
546        }
547
548        self.health_registry.cancel_token().cancel();
549
550        // Then stop lifecycle services in reverse insertion order (LIFO)
551        // Continue stopping all services even if some fail
552        let mut first_error = None;
553        for service in self.services.iter_mut().rev() {
554            info!("Stopping service: {}", service.name());
555            if let Err(e) = service.stop().await {
556                warn!("Service {} failed to stop: {}", service.name(), e);
557                if first_error.is_none() {
558                    first_error = Some(e);
559                }
560            }
561        }
562
563        info!("CamelContext stopped");
564
565        if let Some(e) = first_error {
566            Err(e)
567        } else {
568            Ok(())
569        }
570    }
571
572    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
573    pub fn shutdown_timeout(&self) -> std::time::Duration {
574        self.shutdown_timeout
575    }
576
577    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
578    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
579        self.shutdown_timeout = timeout;
580    }
581
582    /// Immediate abort — kills all tasks without draining.
583    pub async fn abort(&mut self) {
584        self.cancel_token.cancel();
585        if let Some(join) = self.supervision_join.take() {
586            join.abort();
587        }
588        let route_ids = self
589            .route_controller
590            .shutdown_route_ids()
591            .await
592            .unwrap_or_default();
593        for route_id in route_ids {
594            let _ = self
595                .runtime
596                .execute(camel_api::RuntimeCommand::StopRoute {
597                    route_id: route_id.clone(),
598                    command_id: Self::next_context_command_id("abort-stop", &route_id),
599                    causation_id: None,
600                })
601                .await;
602        }
603
604        for service in self.services.iter_mut().rev() {
605            let name = service.name().to_string();
606            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
607                Ok(Ok(())) => info!("Aborted service: {}", name),
608                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
609                Err(_) => warn!("Service {} timed out during abort (5s)", name),
610            }
611        }
612    }
613
614    /// Check health status of all registered services and lifecycle services.
615    pub async fn health_check(&self) -> HealthReport {
616        use camel_api::HealthSource;
617        self.health_report().await
618    }
619
620    pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
621        Arc::clone(&self.health_registry)
622    }
623
624    /// Store a component config. Overwrites any previously stored config of the same type.
625    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
626        self.component_configs
627            .insert(TypeId::of::<T>(), Box::new(config));
628    }
629
630    /// Retrieve a stored component config by type. Returns None if not stored.
631    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
632        self.component_configs
633            .get(&TypeId::of::<T>())
634            .and_then(|b| b.downcast_ref::<T>())
635    }
636
637    // --- Route Template Registry (data-only) ---
638
639    /// Register a route template specification.
640    ///
641    /// Returns `Err(CamelError)` if a template with the same ID is already registered.
642    pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
643        self.template_registry.register(spec)
644    }
645
646    /// Retrieve a route template specification by its ID.
647    pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
648        self.template_registry.get(id)
649    }
650
651    /// Return all registered template IDs.
652    pub fn template_ids(&self) -> Vec<String> {
653        self.template_registry.template_ids()
654    }
655
656    /// Record a newly instantiated template instance.
657    pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
658        self.template_registry.record_instance(record)
659    }
660
661    /// Return all instance records for a given template ID.
662    pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
663        self.template_registry.instances(template_id)
664    }
665}
666
667impl ComponentRegistrar for CamelContext {
668    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
669        self.registry
670            .lock()
671            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
672            .register(component);
673    }
674}
675
676impl ComponentContext for CamelContext {
677    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
678        self.registry.lock().ok()?.get(scheme)
679    }
680
681    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
682        self.languages.lock().ok()?.get(name).cloned()
683    }
684
685    fn metrics(&self) -> Arc<dyn MetricsCollector> {
686        Arc::clone(&self.metrics)
687    }
688
689    fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
690        // The concrete HealthCheckRegistry struct implements the trait via
691        // the impl added in health_registry.rs.
692        Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
693    }
694
695    fn platform_service(&self) -> Arc<dyn PlatformService> {
696        Arc::clone(&self.platform_service)
697    }
698
699    fn register_route_health_check(
700        &self,
701        route_id: &str,
702        check: Arc<dyn camel_api::AsyncHealthCheck>,
703    ) {
704        self.health_registry.register_for_route(route_id, check);
705    }
706
707    fn unregister_route_health_check(&self, route_id: &str) {
708        self.health_registry.unregister_for_route(route_id);
709    }
710}
711
712#[async_trait::async_trait]
713impl camel_api::HealthSource for CamelContext {
714    async fn liveness(&self) -> camel_api::HealthStatus {
715        let has_failed = self
716            .services
717            .iter()
718            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
719        if has_failed {
720            camel_api::HealthStatus::Unhealthy
721        } else {
722            camel_api::HealthStatus::Healthy
723        }
724    }
725
726    async fn readiness(&self) -> camel_api::HealthStatus {
727        let has_failed = self
728            .services
729            .iter()
730            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
731        if has_failed {
732            return camel_api::HealthStatus::Unhealthy;
733        }
734        let has_stopped = self
735            .services
736            .iter()
737            .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
738        if has_stopped {
739            return camel_api::HealthStatus::Degraded;
740        }
741        self.health_registry.check_all().await.status
742    }
743
744    async fn health_report(&self) -> camel_api::HealthReport {
745        let mut report = self.health_registry.check_all().await;
746        let mut worst = report.status;
747        for service in &self.services {
748            let svc_status = service.status();
749            let health = match svc_status {
750                camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
751                camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
752                camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
753            };
754            if matches!(worst, camel_api::HealthStatus::Healthy)
755                && matches!(
756                    health,
757                    camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
758                )
759            {
760                worst = health;
761            }
762            if matches!(worst, camel_api::HealthStatus::Degraded)
763                && matches!(health, camel_api::HealthStatus::Unhealthy)
764            {
765                worst = health;
766            }
767            report.services.push(camel_api::ServiceHealth {
768                name: service.name().to_string(),
769                status: svc_status,
770                message: None,
771            });
772        }
773        report.status = worst;
774        report
775    }
776
777    async fn startup(&self) -> camel_api::HealthStatus {
778        camel_api::HealthStatus::Healthy
779    }
780}
781
782#[cfg(test)]
783#[path = "context_tests.rs"]
784mod context_tests;