Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9#[cfg(test)]
10use camel_api::StepLifecycle;
11use camel_api::error_handler::ErrorHandlerConfig;
12use camel_api::{
13    CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, PlatformIdentity,
14    PlatformService, ReadinessGate, RouteTemplateSpec, RuntimeCommandBus, RuntimeQueryBus,
15    TemplateInstanceRecord,
16};
17use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
18use camel_language_api::Language;
19
20use crate::health_registry::HealthCheckRegistry;
21use crate::lifecycle::adapters::controller_actor::RouteControllerHandle;
22use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
23use crate::lifecycle::application::route_definition::RouteDefinition;
24use crate::lifecycle::application::runtime_bus::RuntimeBus;
25use crate::lifecycle::domain::LanguageRegistryError;
26use crate::registry::RegistryError;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29use crate::template::TemplateRegistry;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub use crate::context_builder::CamelContextBuilder;
34
35/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
36///
37/// # Lifecycle
38///
39/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
40/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
41/// stopped context is not supported — create a new instance instead.
42pub struct CamelContext {
43    registry: Arc<std::sync::Mutex<Registry>>,
44    route_controller: RouteControllerHandle,
45    _actor_join: tokio::task::JoinHandle<()>,
46    supervision_join: Option<tokio::task::JoinHandle<()>>,
47    runtime: Arc<RuntimeBus>,
48    cancel_token: CancellationToken,
49    metrics: Arc<dyn MetricsCollector>,
50    // Platform ports
51    platform_service: Arc<dyn PlatformService>,
52    languages: SharedLanguageRegistry,
53    shutdown_timeout: std::time::Duration,
54    services: Vec<Box<dyn Lifecycle>>,
55    health_registry: Arc<HealthCheckRegistry>,
56    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
57    function_invoker: Option<Arc<dyn FunctionInvoker>>,
58    template_registry: Arc<TemplateRegistry>,
59    idempotent_repositories: crate::registry::SharedIdempotentRegistry,
60    claim_check_repositories: crate::registry::SharedClaimCheckRegistry,
61}
62
63/// Parts bag used by [`CamelContextBuilder::build`] to construct a [`CamelContext`]
64/// without accessing private fields from a sibling module.
65pub(crate) struct FromParts {
66    pub(crate) registry: Arc<std::sync::Mutex<Registry>>,
67    pub(crate) route_controller: RouteControllerHandle,
68    pub(crate) _actor_join: tokio::task::JoinHandle<()>,
69    pub(crate) supervision_join: Option<tokio::task::JoinHandle<()>>,
70    pub(crate) runtime: Arc<RuntimeBus>,
71    pub(crate) cancel_token: CancellationToken,
72    pub(crate) metrics: Arc<dyn MetricsCollector>,
73    pub(crate) platform_service: Arc<dyn PlatformService>,
74    pub(crate) languages: SharedLanguageRegistry,
75    pub(crate) shutdown_timeout: std::time::Duration,
76    pub(crate) services: Vec<Box<dyn Lifecycle>>,
77    pub(crate) health_registry: Arc<HealthCheckRegistry>,
78    pub(crate) component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
79    pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
80    pub(crate) template_registry: Arc<TemplateRegistry>,
81    pub(crate) idempotent_repositories: crate::registry::SharedIdempotentRegistry,
82    pub(crate) claim_check_repositories: crate::registry::SharedClaimCheckRegistry,
83}
84
85impl CamelContext {
86    pub(crate) fn from_parts(parts: FromParts) -> Self {
87        Self {
88            registry: parts.registry,
89            route_controller: parts.route_controller,
90            _actor_join: parts._actor_join,
91            supervision_join: parts.supervision_join,
92            runtime: parts.runtime,
93            cancel_token: parts.cancel_token,
94            metrics: parts.metrics,
95            platform_service: parts.platform_service,
96            languages: parts.languages,
97            shutdown_timeout: parts.shutdown_timeout,
98            services: parts.services,
99            health_registry: parts.health_registry,
100            component_configs: parts.component_configs,
101            function_invoker: parts.function_invoker,
102            template_registry: parts.template_registry,
103            idempotent_repositories: parts.idempotent_repositories,
104            claim_check_repositories: parts.claim_check_repositories,
105        }
106    }
107}
108
109/// Opaque handle for runtime side-effect execution operations.
110///
111/// This intentionally does not expose direct lifecycle mutation APIs to callers.
112#[derive(Clone)]
113pub struct RuntimeExecutionHandle {
114    pub(crate) controller: RouteControllerHandle,
115    pub(crate) runtime: Arc<RuntimeBus>,
116    pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
117    /// Lifecycle handles to inject into the compiled pipeline during
118    /// `apply_swap`.  Used in tests to simulate lifecycle-bearing routes
119    /// (e.g. resequencer).  Always `None` in production.
120    #[cfg(test)]
121    #[allow(clippy::type_complexity)]
122    pub(crate) test_lifecycle_inject: Arc<std::sync::Mutex<Option<Vec<Arc<dyn StepLifecycle>>>>>,
123}
124
125impl RuntimeExecutionHandle {
126    pub(crate) async fn add_route_definition(
127        &self,
128        definition: RouteDefinition,
129    ) -> Result<(), CamelError> {
130        use crate::lifecycle::ports::RouteRegistrationPort;
131        self.runtime
132            .register_route(definition)
133            .await
134            .map_err(Into::into)
135    }
136
137    /// Compile a route definition into a bare BoxProcessor (no lifecycle).
138    /// Kept for tests; hot-reload uses the lifecycle-preserving variant instead.
139    #[allow(dead_code)]
140    pub(crate) async fn compile_route_definition(
141        &self,
142        definition: RouteDefinition,
143    ) -> Result<camel_api::BoxProcessor, CamelError> {
144        self.controller.compile_route_definition(definition).await
145    }
146
147    #[allow(dead_code)] // kept for potential future hot-reload paths
148    pub(crate) async fn compile_route_definition_with_generation(
149        &self,
150        definition: RouteDefinition,
151        generation: u64,
152    ) -> Result<camel_api::BoxProcessor, CamelError> {
153        self.controller
154            .compile_route_definition_with_generation(definition, generation)
155            .await
156    }
157
158    pub(crate) async fn compile_route_definition_pipeline(
159        &self,
160        definition: RouteDefinition,
161        generation: u64,
162    ) -> Result<crate::lifecycle::adapters::route_helpers::CompiledPipeline, CamelError> {
163        self.controller
164            .compile_route_definition_pipeline(definition, generation)
165            .await
166    }
167
168    /// Compile without function generation, returning full CompiledPipeline.
169    /// Oracle Fix 1: stateless hot-reload path preserves lifecycle handles.
170    pub(crate) async fn compile_route_definition_dry_pipeline(
171        &self,
172        definition: RouteDefinition,
173    ) -> Result<crate::lifecycle::adapters::route_helpers::CompiledPipeline, CamelError> {
174        self.controller
175            .compile_route_definition_dry_pipeline(definition)
176            .await
177    }
178
179    pub(crate) async fn prepare_route_definition_with_generation(
180        &self,
181        definition: RouteDefinition,
182        generation: u64,
183    ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
184        self.controller
185            .prepare_route_definition_with_generation(definition, generation)
186            .await
187    }
188
189    pub(crate) async fn insert_prepared_route(
190        &self,
191        prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
192    ) -> Result<(), CamelError> {
193        self.controller.insert_prepared_route(prepared).await
194    }
195
196    pub(crate) async fn remove_route_preserving_functions(
197        &self,
198        route_id: String,
199    ) -> Result<(), CamelError> {
200        self.controller
201            .remove_route_preserving_functions(route_id)
202            .await
203    }
204
205    pub(crate) async fn register_route_aggregate(
206        &self,
207        route_id: String,
208    ) -> Result<(), CamelError> {
209        self.runtime.register_aggregate_only(route_id).await
210    }
211
212    pub(crate) async fn swap_route_pipeline(
213        &self,
214        route_id: &str,
215        pipeline: camel_api::BoxProcessor,
216    ) -> Result<(), CamelError> {
217        self.controller.swap_pipeline(route_id, pipeline).await
218    }
219
220    /// Stop the route via the reload path (graceful lifecycle drain).
221    pub(crate) async fn stop_route_reload(&self, route_id: &str) -> Result<(), CamelError> {
222        self.controller.stop_route_reload(route_id).await
223    }
224
225    /// Start the route via the reload path (re-create consumer).
226    pub(crate) async fn start_route_reload(&self, route_id: &str) -> Result<(), CamelError> {
227        self.controller.start_route_reload(route_id).await
228    }
229
230    /// Raw pipeline swap — bypasses the lifecycle/aggregate rejection check.
231    /// Only safe after the route has been stopped (Restart path).
232    pub(crate) async fn swap_route_pipeline_raw(
233        &self,
234        route_id: &str,
235        pipeline: camel_api::BoxProcessor,
236        lifecycle: Vec<Arc<dyn camel_api::StepLifecycle>>,
237    ) -> Result<(), CamelError> {
238        self.controller
239            .swap_pipeline_raw(route_id, pipeline, lifecycle)
240            .await
241    }
242
243    pub(crate) async fn execute_runtime_command(
244        &self,
245        cmd: camel_api::RuntimeCommand,
246    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
247        self.runtime.execute(cmd).await
248    }
249
250    pub(crate) async fn runtime_route_status(
251        &self,
252        route_id: &str,
253    ) -> Result<Option<String>, CamelError> {
254        match self
255            .runtime
256            .ask(camel_api::RuntimeQuery::GetRouteStatus {
257                route_id: route_id.to_string(),
258            })
259            .await
260        {
261            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
262            Ok(_) => Err(CamelError::RouteError(
263                "unexpected runtime query response for route status".to_string(),
264            )),
265            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
266            Err(err) => Err(err),
267        }
268    }
269
270    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
271        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
272            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
273            Ok(_) => Err(CamelError::RouteError(
274                "unexpected runtime query response for route listing".to_string(),
275            )),
276            Err(err) => Err(err),
277        }
278    }
279
280    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
281        self.controller.route_source_hash(route_id).await
282    }
283
284    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
285        if !self.controller.route_exists(route_id).await? {
286            return Err(CamelError::RouteError(format!(
287                "Route '{}' not found",
288                route_id
289            )));
290        }
291        Ok(self
292            .controller
293            .in_flight_count(route_id)
294            .await?
295            .unwrap_or(0))
296    }
297
298    /// Check whether the running route has lifecycle-bearing steps.
299    pub(crate) async fn route_has_lifecycle(&self, route_id: &str) -> bool {
300        self.controller
301            .route_has_lifecycle(route_id)
302            .await
303            .unwrap_or(false)
304    }
305
306    pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
307        self.function_invoker.clone()
308    }
309
310    #[cfg(test)]
311    pub(crate) async fn force_start_route_for_test(
312        &self,
313        route_id: &str,
314    ) -> Result<(), CamelError> {
315        self.controller.start_route(route_id).await
316    }
317
318    pub async fn controller_route_count_for_test(&self) -> usize {
319        self.controller.route_count().await.unwrap_or(0)
320    }
321}
322
323impl CamelContext {
324    pub fn builder() -> CamelContextBuilder {
325        CamelContextBuilder::new()
326    }
327
328    /// Set a global error handler applied to all routes without a per-route handler.
329    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
330        let _ = self.route_controller.set_error_handler(config).await;
331    }
332
333    /// Enable or disable tracing globally.
334    pub async fn set_tracing(&mut self, enabled: bool) {
335        let _ = self
336            .route_controller
337            .set_tracer_config(TracerConfig {
338                enabled,
339                ..Default::default()
340            })
341            .await;
342    }
343
344    /// Configure tracing with full config.
345    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
346        // Inject metrics collector if not already set
347        let config = if config.metrics_collector.is_none() {
348            TracerConfig {
349                metrics_collector: Some(Arc::clone(&self.metrics)),
350                ..config
351            }
352        } else {
353            config
354        };
355
356        let _ = self.route_controller.set_tracer_config(config).await;
357    }
358
359    /// Builder-style: enable tracing with default config.
360    pub async fn with_tracing(mut self) -> Self {
361        self.set_tracing(true).await;
362        self
363    }
364
365    /// Builder-style: configure tracing with custom config.
366    /// Note: tracing subscriber initialization (stdout/file output) is handled
367    /// separately via init_tracing_subscriber (called in camel-config bridge).
368    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
369        self.set_tracer_config(config).await;
370        self
371    }
372
373    /// Register a lifecycle service (Apache Camel: addService pattern)
374    ///
375    /// For services exposing `as_function_invoker()`, the invoker is propagated
376    /// to the route controller so that subsequent route definitions with function
377    /// steps work correctly.
378    ///
379    /// Prefer [`CamelContextBuilder::with_lifecycle`] when possible, which wires
380    /// the invoker at build time before any routes are added.
381    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
382        if let Some(collector) = service.as_metrics_collector() {
383            self.metrics = collector;
384        }
385        if let Some(invoker) = service.as_function_invoker() {
386            self.function_invoker = Some(invoker.clone());
387            if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
388                tracing::warn!("Failed to propagate function invoker to route controller: {e}");
389            }
390        }
391
392        self.services.push(Box::new(service));
393        self
394    }
395
396    /// Register a component with this context.
397    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
398        info!(scheme = component.scheme(), "Registering component");
399        self.registry
400            .lock()
401            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
402            .register(Arc::new(component));
403    }
404
405    /// Register a language with this context, keyed by name.
406    ///
407    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
408    /// with the same name is already registered. Use
409    /// [`resolve_language`](Self::resolve_language) to check before
410    /// registering, or choose a distinct name.
411    pub fn register_language(
412        &mut self,
413        name: impl Into<String>,
414        lang: Box<dyn Language>,
415    ) -> Result<(), LanguageRegistryError> {
416        let name = name.into();
417        let mut languages = self
418            .languages
419            .lock()
420            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
421        if languages.contains_key(&name) {
422            return Err(LanguageRegistryError::AlreadyRegistered { name });
423        }
424        languages.insert(name, Arc::from(lang));
425        Ok(())
426    }
427
428    /// Resolve a language by name. Returns `None` if not registered.
429    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
430        let languages = self
431            .languages
432            .lock()
433            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
434        languages.get(name).cloned()
435    }
436
437    /// Add a route definition to this context.
438    ///
439    /// The route must have an ID. Steps are resolved immediately using registered components.
440    pub async fn add_route_definition(
441        &self,
442        definition: RouteDefinition,
443    ) -> Result<(), CamelError> {
444        use crate::lifecycle::ports::RouteRegistrationPort;
445        info!(
446            from = definition.from_uri(),
447            route_id = %definition.route_id(),
448            "Adding route definition"
449        );
450        self.runtime
451            .register_route(definition)
452            .await
453            .map_err(Into::into)
454    }
455
456    fn next_context_command_id(op: &str, route_id: &str) -> String {
457        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
458        format!("context:{op}:{route_id}:{seq}")
459    }
460
461    /// Access the component registry.
462    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
463        self.registry
464            .lock()
465            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
466    }
467
468    /// Access the shared component registry Arc.
469    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
470        Arc::clone(&self.registry)
471    }
472
473    /// Get runtime execution handle for file-watcher integrations.
474    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
475        RuntimeExecutionHandle {
476            controller: self.route_controller.clone(),
477            runtime: Arc::clone(&self.runtime),
478            function_invoker: self.function_invoker.clone(),
479            #[cfg(test)]
480            test_lifecycle_inject: Arc::new(std::sync::Mutex::new(None)),
481        }
482    }
483
484    /// Get the metrics collector.
485    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
486        Arc::clone(&self.metrics)
487    }
488
489    /// Get the platform service.
490    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
491        Arc::clone(&self.platform_service)
492    }
493
494    /// Get the readiness gate port.
495    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
496        self.platform_service.readiness_gate()
497    }
498
499    /// Get the platform identity.
500    pub fn platform_identity(&self) -> PlatformIdentity {
501        self.platform_service.identity()
502    }
503
504    /// Get the leadership service port.
505    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
506        self.platform_service.leadership()
507    }
508
509    /// Get runtime command/query bus handle.
510    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
511        self.runtime.clone()
512    }
513
514    /// Build a producer context wired to this runtime.
515    pub fn producer_context(&self) -> camel_api::ProducerContext {
516        camel_api::ProducerContext::new().with_runtime(self.runtime())
517    }
518
519    /// Query route status via runtime read-model.
520    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
521        match self
522            .runtime()
523            .ask(camel_api::RuntimeQuery::GetRouteStatus {
524                route_id: route_id.to_string(),
525            })
526            .await
527        {
528            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
529            Ok(_) => Err(CamelError::RouteError(
530                "unexpected runtime query response for route status".to_string(),
531            )),
532            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
533            Err(err) => Err(err),
534        }
535    }
536
537    /// Start all routes. Each route's consumer will begin producing exchanges.
538    ///
539    /// Only routes with `auto_startup == true` will be started, in order of their
540    /// `startup_order` (lower values start first).
541    pub async fn start(&mut self) -> Result<(), CamelError> {
542        info!("Starting CamelContext");
543
544        // Start lifecycle services first
545        for (i, service) in self.services.iter_mut().enumerate() {
546            info!("Starting service: {}", service.name());
547            if let Err(e) = service.start().await {
548                // Rollback: stop already started services in reverse order
549                warn!(
550                    "Service {} failed to start, rolling back {} services",
551                    service.name(),
552                    i
553                );
554                for j in (0..i).rev() {
555                    if let Err(rollback_err) = self.services[j].stop().await {
556                        warn!(
557                            "Failed to stop service {} during rollback: {}",
558                            self.services[j].name(),
559                            rollback_err
560                        );
561                    }
562                }
563                return Err(e);
564            }
565        }
566
567        // Then start routes via runtime command bus (aggregate-first),
568        // preserving route controller startup ordering metadata.
569        let route_ids = self.route_controller.auto_startup_route_ids().await?;
570        for route_id in route_ids {
571            self.runtime
572                .execute(camel_api::RuntimeCommand::StartRoute {
573                    route_id: route_id.clone(),
574                    command_id: Self::next_context_command_id("start", &route_id),
575                    causation_id: None,
576                })
577                .await?;
578        }
579
580        info!("CamelContext started");
581        Ok(())
582    }
583
584    /// Graceful shutdown with default 30-second timeout.
585    pub async fn stop(&mut self) -> Result<(), CamelError> {
586        self.stop_timeout(self.shutdown_timeout).await
587    }
588
589    /// Graceful shutdown with custom timeout.
590    ///
591    /// Note: The timeout parameter is currently not propagated to the
592    /// RouteController's per-route shutdown timeout. The RouteController
593    /// uses a hardcoded 5-second default (`DEFAULT_SHUTDOWN_TIMEOUT`).
594    /// Full propagation is planned for a future version.
595    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
596        info!("Stopping CamelContext");
597
598        // Signal cancellation (for any legacy code that might use it)
599        self.cancel_token.cancel();
600        if let Some(join) = self.supervision_join.take() {
601            join.abort();
602        }
603
604        // Stop all routes via runtime command bus (aggregate-first),
605        // preserving route controller shutdown ordering metadata.
606        let route_ids = self.route_controller.shutdown_route_ids().await?;
607        for route_id in route_ids {
608            if let Err(err) = self
609                .runtime
610                .execute(camel_api::RuntimeCommand::StopRoute {
611                    route_id: route_id.clone(),
612                    command_id: Self::next_context_command_id("stop", &route_id),
613                    causation_id: None,
614                })
615                .await
616            {
617                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
618            }
619        }
620
621        self.health_registry.cancel_token().cancel();
622
623        // Then stop lifecycle services in reverse insertion order (LIFO)
624        // Continue stopping all services even if some fail
625        let mut first_error = None;
626        for service in self.services.iter_mut().rev() {
627            info!("Stopping service: {}", service.name());
628            if let Err(e) = service.stop().await {
629                warn!("Service {} failed to stop: {}", service.name(), e);
630                if first_error.is_none() {
631                    first_error = Some(e);
632                }
633            }
634        }
635
636        info!("CamelContext stopped");
637
638        if let Some(e) = first_error {
639            Err(e)
640        } else {
641            Ok(())
642        }
643    }
644
645    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
646    pub fn shutdown_timeout(&self) -> std::time::Duration {
647        self.shutdown_timeout
648    }
649
650    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
651    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
652        self.shutdown_timeout = timeout;
653    }
654
655    /// Immediate abort — kills all tasks without draining.
656    pub async fn abort(&mut self) {
657        self.cancel_token.cancel();
658        if let Some(join) = self.supervision_join.take() {
659            join.abort();
660        }
661        let route_ids = self
662            .route_controller
663            .shutdown_route_ids()
664            .await
665            .unwrap_or_default();
666        for route_id in route_ids {
667            let _ = self
668                .runtime
669                .execute(camel_api::RuntimeCommand::StopRoute {
670                    route_id: route_id.clone(),
671                    command_id: Self::next_context_command_id("abort-stop", &route_id),
672                    causation_id: None,
673                })
674                .await;
675        }
676
677        for service in self.services.iter_mut().rev() {
678            let name = service.name().to_string();
679            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
680                Ok(Ok(())) => info!("Aborted service: {}", name),
681                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
682                Err(_) => warn!("Service {} timed out during abort (5s)", name),
683            }
684        }
685    }
686
687    /// Check health status of all registered services and lifecycle services.
688    pub async fn health_check(&self) -> HealthReport {
689        use camel_api::HealthSource;
690        self.health_report().await
691    }
692
693    pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
694        Arc::clone(&self.health_registry)
695    }
696
697    /// Store a component config. Overwrites any previously stored config of the same type.
698    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
699        self.component_configs
700            .insert(TypeId::of::<T>(), Box::new(config));
701    }
702
703    /// Retrieve a stored component config by type. Returns None if not stored.
704    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
705        self.component_configs
706            .get(&TypeId::of::<T>())
707            .and_then(|b| b.downcast_ref::<T>())
708    }
709
710    // --- Route Template Registry (data-only) ---
711
712    /// Register a route template specification.
713    ///
714    /// Returns `Err(CamelError)` if a template with the same ID is already registered.
715    pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
716        self.template_registry.register(spec)
717    }
718
719    /// Retrieve a route template specification by its ID.
720    pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
721        self.template_registry.get(id)
722    }
723
724    /// Return all registered template IDs.
725    pub fn template_ids(&self) -> Vec<String> {
726        self.template_registry.template_ids()
727    }
728
729    /// Record a newly instantiated template instance.
730    pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
731        self.template_registry.record_instance(record)
732    }
733
734    /// Return all instance records for a given template ID.
735    pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
736        self.template_registry.instances(template_id)
737    }
738
739    // --- Idempotent Repository Registry ---
740
741    /// Register an idempotent repository.
742    ///
743    /// Returns `Err(RegistryError::AlreadyRegistered)` if a repository with
744    /// the same name is already registered.
745    pub fn register_idempotent_repository(
746        &mut self,
747        name: impl Into<String>,
748        repo: Arc<dyn camel_api::IdempotentRepository>,
749    ) -> Result<(), RegistryError> {
750        self.idempotent_repositories.register(name, repo)
751    }
752
753    /// Retrieve an idempotent repository by name.
754    pub fn idempotent_repository(
755        &self,
756        name: &str,
757    ) -> Option<Arc<dyn camel_api::IdempotentRepository>> {
758        self.idempotent_repositories.get(name)
759    }
760
761    // --- Claim Check Repository Registry ---
762
763    /// Register a claim check repository.
764    ///
765    /// Returns `Err(RegistryError::AlreadyRegistered)` if a repository with
766    /// the same name is already registered.
767    pub fn register_claim_check_repository(
768        &mut self,
769        name: impl Into<String>,
770        repo: Arc<dyn camel_api::ClaimCheckRepository>,
771    ) -> Result<(), RegistryError> {
772        self.claim_check_repositories.register(name, repo)
773    }
774
775    /// Retrieve a claim check repository by name.
776    pub fn claim_check_repository(
777        &self,
778        name: &str,
779    ) -> Option<Arc<dyn camel_api::ClaimCheckRepository>> {
780        self.claim_check_repositories.get(name)
781    }
782}
783
784impl ComponentRegistrar for CamelContext {
785    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
786        self.registry
787            .lock()
788            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
789            .register(component);
790    }
791}
792
793impl ComponentContext for CamelContext {
794    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
795        self.registry.lock().ok()?.get(scheme)
796    }
797
798    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
799        self.languages.lock().ok()?.get(name).cloned()
800    }
801
802    fn metrics(&self) -> Arc<dyn MetricsCollector> {
803        Arc::clone(&self.metrics)
804    }
805
806    fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
807        // The concrete HealthCheckRegistry struct implements the trait via
808        // the impl added in health_registry.rs.
809        Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
810    }
811
812    fn platform_service(&self) -> Arc<dyn PlatformService> {
813        Arc::clone(&self.platform_service)
814    }
815
816    fn register_route_health_check(
817        &self,
818        route_id: &str,
819        check: Arc<dyn camel_api::AsyncHealthCheck>,
820    ) {
821        self.health_registry.register_for_route(route_id, check);
822    }
823
824    fn unregister_route_health_check(&self, route_id: &str) {
825        self.health_registry.unregister_for_route(route_id);
826    }
827}
828
829#[async_trait::async_trait]
830impl camel_api::HealthSource for CamelContext {
831    async fn liveness(&self) -> camel_api::HealthStatus {
832        let has_failed = self
833            .services
834            .iter()
835            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
836        if has_failed {
837            camel_api::HealthStatus::Unhealthy
838        } else {
839            camel_api::HealthStatus::Healthy
840        }
841    }
842
843    async fn readiness(&self) -> camel_api::HealthStatus {
844        let has_failed = self
845            .services
846            .iter()
847            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
848        if has_failed {
849            return camel_api::HealthStatus::Unhealthy;
850        }
851        let has_stopped = self
852            .services
853            .iter()
854            .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
855        if has_stopped {
856            return camel_api::HealthStatus::Degraded;
857        }
858        self.health_registry.check_all().await.status
859    }
860
861    async fn health_report(&self) -> camel_api::HealthReport {
862        let mut report = self.health_registry.check_all().await;
863        let mut worst = report.status;
864        for service in &self.services {
865            let svc_status = service.status();
866            let health = match svc_status {
867                camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
868                camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
869                camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
870            };
871            if matches!(worst, camel_api::HealthStatus::Healthy)
872                && matches!(
873                    health,
874                    camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
875                )
876            {
877                worst = health;
878            }
879            if matches!(worst, camel_api::HealthStatus::Degraded)
880                && matches!(health, camel_api::HealthStatus::Unhealthy)
881            {
882                worst = health;
883            }
884            report.services.push(camel_api::ServiceHealth {
885                name: service.name().to_string(),
886                status: svc_status,
887                message: None,
888            });
889        }
890        report.status = worst;
891        report
892    }
893
894    async fn startup(&self) -> camel_api::HealthStatus {
895        camel_api::HealthStatus::Healthy
896    }
897}
898
899#[cfg(test)]
900#[path = "context_tests.rs"]
901mod context_tests;