Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, NoOpMetrics,
12    NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RouteTemplateSpec,
13    RuntimeCommandBus, RuntimeQueryBus, SupervisionConfig, TemplateInstanceRecord,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::health_registry::HealthCheckRegistry;
19use crate::lifecycle::adapters::RuntimeExecutionAdapter;
20use crate::lifecycle::adapters::controller_actor::{
21    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
22};
23use crate::lifecycle::adapters::route_controller::{
24    DefaultRouteController, SharedLanguageRegistry,
25};
26use crate::lifecycle::application::route_definition::RouteDefinition;
27use crate::lifecycle::application::runtime_bus::RuntimeBus;
28use crate::lifecycle::domain::LanguageRegistryError;
29use crate::lifecycle::ports::RuntimeExecutionPort;
30use crate::shared::components::domain::Registry;
31use crate::shared::observability::domain::TracerConfig;
32use crate::template::TemplateRegistry;
33
34static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
35
36type ExecutionFactory =
37    Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
38
39pub struct CamelContextBuilder {
40    registry: Option<Arc<std::sync::Mutex<Registry>>>,
41    languages: Option<SharedLanguageRegistry>,
42    metrics: Option<Arc<dyn MetricsCollector>>,
43    // Platform ports
44    platform_service: Option<Arc<dyn PlatformService>>,
45    supervision_config: Option<SupervisionConfig>,
46    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
47    shutdown_timeout: std::time::Duration,
48    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
49    function_invoker: Option<Arc<dyn FunctionInvoker>>,
50    lifecycle_services: Vec<Box<dyn Lifecycle>>,
51    execution_factory: Option<ExecutionFactory>,
52    health_registry: Option<Arc<HealthCheckRegistry>>,
53    template_registry: Option<Arc<TemplateRegistry>>,
54}
55
56/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
57///
58/// # Lifecycle
59///
60/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
61/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
62/// stopped context is not supported — create a new instance instead.
63pub struct CamelContext {
64    registry: Arc<std::sync::Mutex<Registry>>,
65    route_controller: RouteControllerHandle,
66    _actor_join: tokio::task::JoinHandle<()>,
67    supervision_join: Option<tokio::task::JoinHandle<()>>,
68    runtime: Arc<RuntimeBus>,
69    cancel_token: CancellationToken,
70    metrics: Arc<dyn MetricsCollector>,
71    // Platform ports
72    platform_service: Arc<dyn PlatformService>,
73    languages: SharedLanguageRegistry,
74    shutdown_timeout: std::time::Duration,
75    services: Vec<Box<dyn Lifecycle>>,
76    health_registry: Arc<HealthCheckRegistry>,
77    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
78    function_invoker: Option<Arc<dyn FunctionInvoker>>,
79    template_registry: Arc<TemplateRegistry>,
80}
81
82/// Opaque handle for runtime side-effect execution operations.
83///
84/// This intentionally does not expose direct lifecycle mutation APIs to callers.
85#[derive(Clone)]
86pub struct RuntimeExecutionHandle {
87    controller: RouteControllerHandle,
88    runtime: Arc<RuntimeBus>,
89    function_invoker: Option<Arc<dyn FunctionInvoker>>,
90}
91
92impl RuntimeExecutionHandle {
93    pub(crate) async fn add_route_definition(
94        &self,
95        definition: RouteDefinition,
96    ) -> Result<(), CamelError> {
97        use crate::lifecycle::ports::RouteRegistrationPort;
98        self.runtime
99            .register_route(definition)
100            .await
101            .map_err(Into::into)
102    }
103
104    pub(crate) async fn compile_route_definition(
105        &self,
106        definition: RouteDefinition,
107    ) -> Result<camel_api::BoxProcessor, CamelError> {
108        self.controller.compile_route_definition(definition).await
109    }
110
111    pub(crate) async fn compile_route_definition_with_generation(
112        &self,
113        definition: RouteDefinition,
114        generation: u64,
115    ) -> Result<camel_api::BoxProcessor, CamelError> {
116        self.controller
117            .compile_route_definition_with_generation(definition, generation)
118            .await
119    }
120
121    pub(crate) async fn prepare_route_definition_with_generation(
122        &self,
123        definition: RouteDefinition,
124        generation: u64,
125    ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
126        self.controller
127            .prepare_route_definition_with_generation(definition, generation)
128            .await
129    }
130
131    pub(crate) async fn insert_prepared_route(
132        &self,
133        prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
134    ) -> Result<(), CamelError> {
135        self.controller.insert_prepared_route(prepared).await
136    }
137
138    pub(crate) async fn remove_route_preserving_functions(
139        &self,
140        route_id: String,
141    ) -> Result<(), CamelError> {
142        self.controller
143            .remove_route_preserving_functions(route_id)
144            .await
145    }
146
147    pub(crate) async fn register_route_aggregate(
148        &self,
149        route_id: String,
150    ) -> Result<(), CamelError> {
151        self.runtime.register_aggregate_only(route_id).await
152    }
153
154    pub(crate) async fn swap_route_pipeline(
155        &self,
156        route_id: &str,
157        pipeline: camel_api::BoxProcessor,
158    ) -> Result<(), CamelError> {
159        self.controller.swap_pipeline(route_id, pipeline).await
160    }
161
162    pub(crate) async fn execute_runtime_command(
163        &self,
164        cmd: camel_api::RuntimeCommand,
165    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
166        self.runtime.execute(cmd).await
167    }
168
169    pub(crate) async fn runtime_route_status(
170        &self,
171        route_id: &str,
172    ) -> Result<Option<String>, CamelError> {
173        match self
174            .runtime
175            .ask(camel_api::RuntimeQuery::GetRouteStatus {
176                route_id: route_id.to_string(),
177            })
178            .await
179        {
180            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
181            Ok(_) => Err(CamelError::RouteError(
182                "unexpected runtime query response for route status".to_string(),
183            )),
184            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
185            Err(err) => Err(err),
186        }
187    }
188
189    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
190        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
191            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
192            Ok(_) => Err(CamelError::RouteError(
193                "unexpected runtime query response for route listing".to_string(),
194            )),
195            Err(err) => Err(err),
196        }
197    }
198
199    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
200        self.controller.route_source_hash(route_id).await
201    }
202
203    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
204        if !self.controller.route_exists(route_id).await? {
205            return Err(CamelError::RouteError(format!(
206                "Route '{}' not found",
207                route_id
208            )));
209        }
210        Ok(self
211            .controller
212            .in_flight_count(route_id)
213            .await?
214            .unwrap_or(0))
215    }
216
217    pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
218        self.function_invoker.clone()
219    }
220
221    #[cfg(test)]
222    pub(crate) async fn force_start_route_for_test(
223        &self,
224        route_id: &str,
225    ) -> Result<(), CamelError> {
226        self.controller.start_route(route_id).await
227    }
228
229    pub async fn controller_route_count_for_test(&self) -> usize {
230        self.controller.route_count().await.unwrap_or(0)
231    }
232}
233
234impl CamelContext {
235    fn built_in_languages() -> SharedLanguageRegistry {
236        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
237        languages.insert(
238            "simple".to_string(),
239            Arc::new(camel_language_simple::SimpleLanguage::new()),
240        );
241        #[cfg(feature = "lang-js")]
242        {
243            let js_lang = camel_language_js::JsLanguage::new();
244            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
245            languages.insert("javascript".to_string(), Arc::new(js_lang));
246        }
247        #[cfg(feature = "lang-rhai")]
248        {
249            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
250            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
251        }
252        #[cfg(feature = "lang-jsonpath")]
253        {
254            languages.insert(
255                "jsonpath".to_string(),
256                Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
257            );
258        }
259        #[cfg(feature = "lang-xpath")]
260        {
261            languages.insert(
262                "xpath".to_string(),
263                Arc::new(camel_language_xpath::XPathLanguage::new()),
264            );
265        }
266        Arc::new(std::sync::Mutex::new(languages))
267    }
268
269    fn build_runtime(
270        controller: RouteControllerHandle,
271        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
272        execution_factory: Option<ExecutionFactory>,
273        health_registry: Arc<HealthCheckRegistry>,
274    ) -> Arc<RuntimeBus> {
275        let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
276            factory(controller.clone())
277        } else {
278            Arc::new(RuntimeExecutionAdapter::new(controller))
279        };
280        Arc::new(
281            RuntimeBus::new(
282                Arc::new(store.clone()),
283                Arc::new(store.clone()),
284                Arc::new(store.clone()),
285                Arc::new(store.clone()),
286            )
287            .with_uow(Arc::new(store))
288            .with_execution(execution)
289            .with_health_registry(health_registry),
290        )
291    }
292
293    pub fn builder() -> CamelContextBuilder {
294        CamelContextBuilder::new()
295    }
296
297    /// Set a global error handler applied to all routes without a per-route handler.
298    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
299        let _ = self.route_controller.set_error_handler(config).await;
300    }
301
302    /// Enable or disable tracing globally.
303    pub async fn set_tracing(&mut self, enabled: bool) {
304        let _ = self
305            .route_controller
306            .set_tracer_config(TracerConfig {
307                enabled,
308                ..Default::default()
309            })
310            .await;
311    }
312
313    /// Configure tracing with full config.
314    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
315        // Inject metrics collector if not already set
316        let config = if config.metrics_collector.is_none() {
317            TracerConfig {
318                metrics_collector: Some(Arc::clone(&self.metrics)),
319                ..config
320            }
321        } else {
322            config
323        };
324
325        let _ = self.route_controller.set_tracer_config(config).await;
326    }
327
328    /// Builder-style: enable tracing with default config.
329    pub async fn with_tracing(mut self) -> Self {
330        self.set_tracing(true).await;
331        self
332    }
333
334    /// Builder-style: configure tracing with custom config.
335    /// Note: tracing subscriber initialization (stdout/file output) is handled
336    /// separately via init_tracing_subscriber (called in camel-config bridge).
337    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
338        self.set_tracer_config(config).await;
339        self
340    }
341
342    /// Register a lifecycle service (Apache Camel: addService pattern)
343    ///
344    /// For services exposing `as_function_invoker()`, the invoker is propagated
345    /// to the route controller so that subsequent route definitions with function
346    /// steps work correctly.
347    ///
348    /// Prefer [`CamelContextBuilder::with_lifecycle`] when possible, which wires
349    /// the invoker at build time before any routes are added.
350    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
351        if let Some(collector) = service.as_metrics_collector() {
352            self.metrics = collector;
353        }
354        if let Some(invoker) = service.as_function_invoker() {
355            self.function_invoker = Some(invoker.clone());
356            if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
357                tracing::warn!("Failed to propagate function invoker to route controller: {e}");
358            }
359        }
360
361        self.services.push(Box::new(service));
362        self
363    }
364
365    /// Register a component with this context.
366    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
367        info!(scheme = component.scheme(), "Registering component");
368        self.registry
369            .lock()
370            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
371            .register(Arc::new(component));
372    }
373
374    /// Register a language with this context, keyed by name.
375    ///
376    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
377    /// with the same name is already registered. Use
378    /// [`resolve_language`](Self::resolve_language) to check before
379    /// registering, or choose a distinct name.
380    pub fn register_language(
381        &mut self,
382        name: impl Into<String>,
383        lang: Box<dyn Language>,
384    ) -> Result<(), LanguageRegistryError> {
385        let name = name.into();
386        let mut languages = self
387            .languages
388            .lock()
389            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
390        if languages.contains_key(&name) {
391            return Err(LanguageRegistryError::AlreadyRegistered { name });
392        }
393        languages.insert(name, Arc::from(lang));
394        Ok(())
395    }
396
397    /// Resolve a language by name. Returns `None` if not registered.
398    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
399        let languages = self
400            .languages
401            .lock()
402            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
403        languages.get(name).cloned()
404    }
405
406    /// Add a route definition to this context.
407    ///
408    /// The route must have an ID. Steps are resolved immediately using registered components.
409    pub async fn add_route_definition(
410        &self,
411        definition: RouteDefinition,
412    ) -> Result<(), CamelError> {
413        use crate::lifecycle::ports::RouteRegistrationPort;
414        info!(
415            from = definition.from_uri(),
416            route_id = %definition.route_id(),
417            "Adding route definition"
418        );
419        self.runtime
420            .register_route(definition)
421            .await
422            .map_err(Into::into)
423    }
424
425    fn next_context_command_id(op: &str, route_id: &str) -> String {
426        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
427        format!("context:{op}:{route_id}:{seq}")
428    }
429
430    /// Access the component registry.
431    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
432        self.registry
433            .lock()
434            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
435    }
436
437    /// Access the shared component registry Arc.
438    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
439        Arc::clone(&self.registry)
440    }
441
442    /// Get runtime execution handle for file-watcher integrations.
443    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
444        RuntimeExecutionHandle {
445            controller: self.route_controller.clone(),
446            runtime: Arc::clone(&self.runtime),
447            function_invoker: self.function_invoker.clone(),
448        }
449    }
450
451    /// Get the metrics collector.
452    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
453        Arc::clone(&self.metrics)
454    }
455
456    /// Get the platform service.
457    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
458        Arc::clone(&self.platform_service)
459    }
460
461    /// Get the readiness gate port.
462    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
463        self.platform_service.readiness_gate()
464    }
465
466    /// Get the platform identity.
467    pub fn platform_identity(&self) -> PlatformIdentity {
468        self.platform_service.identity()
469    }
470
471    /// Get the leadership service port.
472    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
473        self.platform_service.leadership()
474    }
475
476    /// Get runtime command/query bus handle.
477    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
478        self.runtime.clone()
479    }
480
481    /// Build a producer context wired to this runtime.
482    pub fn producer_context(&self) -> camel_api::ProducerContext {
483        camel_api::ProducerContext::new().with_runtime(self.runtime())
484    }
485
486    /// Query route status via runtime read-model.
487    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
488        match self
489            .runtime()
490            .ask(camel_api::RuntimeQuery::GetRouteStatus {
491                route_id: route_id.to_string(),
492            })
493            .await
494        {
495            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
496            Ok(_) => Err(CamelError::RouteError(
497                "unexpected runtime query response for route status".to_string(),
498            )),
499            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
500            Err(err) => Err(err),
501        }
502    }
503
504    /// Start all routes. Each route's consumer will begin producing exchanges.
505    ///
506    /// Only routes with `auto_startup == true` will be started, in order of their
507    /// `startup_order` (lower values start first).
508    pub async fn start(&mut self) -> Result<(), CamelError> {
509        info!("Starting CamelContext");
510
511        // Start lifecycle services first
512        for (i, service) in self.services.iter_mut().enumerate() {
513            info!("Starting service: {}", service.name());
514            if let Err(e) = service.start().await {
515                // Rollback: stop already started services in reverse order
516                warn!(
517                    "Service {} failed to start, rolling back {} services",
518                    service.name(),
519                    i
520                );
521                for j in (0..i).rev() {
522                    if let Err(rollback_err) = self.services[j].stop().await {
523                        warn!(
524                            "Failed to stop service {} during rollback: {}",
525                            self.services[j].name(),
526                            rollback_err
527                        );
528                    }
529                }
530                return Err(e);
531            }
532        }
533
534        // Then start routes via runtime command bus (aggregate-first),
535        // preserving route controller startup ordering metadata.
536        let route_ids = self.route_controller.auto_startup_route_ids().await?;
537        for route_id in route_ids {
538            self.runtime
539                .execute(camel_api::RuntimeCommand::StartRoute {
540                    route_id: route_id.clone(),
541                    command_id: Self::next_context_command_id("start", &route_id),
542                    causation_id: None,
543                })
544                .await?;
545        }
546
547        info!("CamelContext started");
548        Ok(())
549    }
550
551    /// Graceful shutdown with default 30-second timeout.
552    pub async fn stop(&mut self) -> Result<(), CamelError> {
553        self.stop_timeout(self.shutdown_timeout).await
554    }
555
556    /// Graceful shutdown with custom timeout.
557    ///
558    /// Note: The timeout parameter is currently not propagated to the
559    /// RouteController's per-route shutdown timeout. The RouteController
560    /// uses a hardcoded 5-second default (`DEFAULT_SHUTDOWN_TIMEOUT`).
561    /// Full propagation is planned for a future version.
562    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
563        info!("Stopping CamelContext");
564
565        // Signal cancellation (for any legacy code that might use it)
566        self.cancel_token.cancel();
567        if let Some(join) = self.supervision_join.take() {
568            join.abort();
569        }
570
571        // Stop all routes via runtime command bus (aggregate-first),
572        // preserving route controller shutdown ordering metadata.
573        let route_ids = self.route_controller.shutdown_route_ids().await?;
574        for route_id in route_ids {
575            if let Err(err) = self
576                .runtime
577                .execute(camel_api::RuntimeCommand::StopRoute {
578                    route_id: route_id.clone(),
579                    command_id: Self::next_context_command_id("stop", &route_id),
580                    causation_id: None,
581                })
582                .await
583            {
584                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
585            }
586        }
587
588        self.health_registry.cancel_token().cancel();
589
590        // Then stop lifecycle services in reverse insertion order (LIFO)
591        // Continue stopping all services even if some fail
592        let mut first_error = None;
593        for service in self.services.iter_mut().rev() {
594            info!("Stopping service: {}", service.name());
595            if let Err(e) = service.stop().await {
596                warn!("Service {} failed to stop: {}", service.name(), e);
597                if first_error.is_none() {
598                    first_error = Some(e);
599                }
600            }
601        }
602
603        info!("CamelContext stopped");
604
605        if let Some(e) = first_error {
606            Err(e)
607        } else {
608            Ok(())
609        }
610    }
611
612    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
613    pub fn shutdown_timeout(&self) -> std::time::Duration {
614        self.shutdown_timeout
615    }
616
617    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
618    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
619        self.shutdown_timeout = timeout;
620    }
621
622    /// Immediate abort — kills all tasks without draining.
623    pub async fn abort(&mut self) {
624        self.cancel_token.cancel();
625        if let Some(join) = self.supervision_join.take() {
626            join.abort();
627        }
628        let route_ids = self
629            .route_controller
630            .shutdown_route_ids()
631            .await
632            .unwrap_or_default();
633        for route_id in route_ids {
634            let _ = self
635                .runtime
636                .execute(camel_api::RuntimeCommand::StopRoute {
637                    route_id: route_id.clone(),
638                    command_id: Self::next_context_command_id("abort-stop", &route_id),
639                    causation_id: None,
640                })
641                .await;
642        }
643
644        for service in self.services.iter_mut().rev() {
645            let name = service.name().to_string();
646            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
647                Ok(Ok(())) => info!("Aborted service: {}", name),
648                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
649                Err(_) => warn!("Service {} timed out during abort (5s)", name),
650            }
651        }
652    }
653
654    /// Check health status of all registered services and lifecycle services.
655    pub async fn health_check(&self) -> HealthReport {
656        use camel_api::HealthSource;
657        self.health_report().await
658    }
659
660    pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
661        Arc::clone(&self.health_registry)
662    }
663
664    /// Store a component config. Overwrites any previously stored config of the same type.
665    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
666        self.component_configs
667            .insert(TypeId::of::<T>(), Box::new(config));
668    }
669
670    /// Retrieve a stored component config by type. Returns None if not stored.
671    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
672        self.component_configs
673            .get(&TypeId::of::<T>())
674            .and_then(|b| b.downcast_ref::<T>())
675    }
676
677    // --- Route Template Registry (data-only) ---
678
679    /// Register a route template specification.
680    ///
681    /// Returns `Err(CamelError)` if a template with the same ID is already registered.
682    pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
683        self.template_registry.register(spec)
684    }
685
686    /// Retrieve a route template specification by its ID.
687    pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
688        self.template_registry.get(id)
689    }
690
691    /// Return all registered template IDs.
692    pub fn template_ids(&self) -> Vec<String> {
693        self.template_registry.template_ids()
694    }
695
696    /// Record a newly instantiated template instance.
697    pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
698        self.template_registry.record_instance(record)
699    }
700
701    /// Return all instance records for a given template ID.
702    pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
703        self.template_registry.instances(template_id)
704    }
705}
706
707impl ComponentRegistrar for CamelContext {
708    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
709        self.registry
710            .lock()
711            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
712            .register(component);
713    }
714}
715
716impl ComponentContext for CamelContext {
717    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
718        self.registry.lock().ok()?.get(scheme)
719    }
720
721    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
722        self.languages.lock().ok()?.get(name).cloned()
723    }
724
725    fn metrics(&self) -> Arc<dyn MetricsCollector> {
726        Arc::clone(&self.metrics)
727    }
728
729    fn platform_service(&self) -> Arc<dyn PlatformService> {
730        Arc::clone(&self.platform_service)
731    }
732
733    fn register_route_health_check(
734        &self,
735        route_id: &str,
736        check: Arc<dyn camel_api::AsyncHealthCheck>,
737    ) {
738        self.health_registry.register_for_route(route_id, check);
739    }
740
741    fn unregister_route_health_check(&self, route_id: &str) {
742        self.health_registry.unregister_for_route(route_id);
743    }
744}
745
746#[async_trait::async_trait]
747impl camel_api::HealthSource for CamelContext {
748    async fn liveness(&self) -> camel_api::HealthStatus {
749        let has_failed = self
750            .services
751            .iter()
752            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
753        if has_failed {
754            camel_api::HealthStatus::Unhealthy
755        } else {
756            camel_api::HealthStatus::Healthy
757        }
758    }
759
760    async fn readiness(&self) -> camel_api::HealthStatus {
761        let has_failed = self
762            .services
763            .iter()
764            .any(|s| s.status() == camel_api::ServiceStatus::Failed);
765        if has_failed {
766            return camel_api::HealthStatus::Unhealthy;
767        }
768        let has_stopped = self
769            .services
770            .iter()
771            .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
772        if has_stopped {
773            return camel_api::HealthStatus::Degraded;
774        }
775        self.health_registry.check_all().await.status
776    }
777
778    async fn health_report(&self) -> camel_api::HealthReport {
779        let mut report = self.health_registry.check_all().await;
780        let mut worst = report.status;
781        for service in &self.services {
782            let svc_status = service.status();
783            let health = match svc_status {
784                camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
785                camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
786                camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
787            };
788            if matches!(worst, camel_api::HealthStatus::Healthy)
789                && matches!(
790                    health,
791                    camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
792                )
793            {
794                worst = health;
795            }
796            if matches!(worst, camel_api::HealthStatus::Degraded)
797                && matches!(health, camel_api::HealthStatus::Unhealthy)
798            {
799                worst = health;
800            }
801            report.services.push(camel_api::ServiceHealth {
802                name: service.name().to_string(),
803                status: svc_status,
804                message: None,
805            });
806        }
807        report.status = worst;
808        report
809    }
810
811    async fn startup(&self) -> camel_api::HealthStatus {
812        camel_api::HealthStatus::Healthy
813    }
814}
815
816impl CamelContextBuilder {
817    pub fn new() -> Self {
818        Self {
819            registry: None,
820            languages: None,
821            metrics: None,
822            platform_service: None,
823            supervision_config: None,
824            runtime_store: None,
825            shutdown_timeout: std::time::Duration::from_secs(5),
826            beans: None,
827            function_invoker: None,
828            lifecycle_services: Vec::new(),
829            execution_factory: None,
830            health_registry: None,
831            template_registry: None,
832        }
833    }
834
835    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
836        self.registry = Some(registry);
837        self
838    }
839
840    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
841        self.languages = Some(languages);
842        self
843    }
844
845    pub fn with_execution_factory(
846        mut self,
847        factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
848    ) -> Self {
849        self.execution_factory = Some(Arc::new(factory));
850        self
851    }
852
853    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
854        self.metrics = Some(metrics);
855        self
856    }
857
858    /// Set a custom platform service.
859    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
860        self.platform_service = Some(platform_service);
861        self
862    }
863
864    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
865        self.supervision_config = Some(config);
866        self
867    }
868
869    pub fn runtime_store(
870        mut self,
871        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
872    ) -> Self {
873        self.runtime_store = Some(store);
874        self
875    }
876
877    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
878        self.shutdown_timeout = timeout;
879        self
880    }
881
882    pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
883        self.health_registry = Some(registry);
884        self
885    }
886
887    /// Inject a shared `BeanRegistry` for bean resolution across routes.
888    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
889        self.beans = Some(beans);
890        self
891    }
892
893    /// Register a lifecycle service (e.g., FunctionRuntimeService) at builder time.
894    ///
895    /// This is the recommended path for services that need to be wired into the
896    /// route controller before any routes are added. The function invoker (if any)
897    /// is extracted and passed to the `DefaultRouteController` during `build()`.
898    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
899        if let Some(collector) = service.as_metrics_collector() {
900            self.metrics = Some(collector);
901        }
902        if let Some(invoker) = service.as_function_invoker() {
903            self.function_invoker = Some(invoker);
904        }
905        self.lifecycle_services.push(Box::new(service));
906        self
907    }
908
909    /// Set a custom `TemplateRegistry` for route template storage.
910    ///
911    /// If not provided, a default empty registry is created during `build()`.
912    pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
913        self.template_registry = Some(registry);
914        self
915    }
916
917    pub async fn build(self) -> Result<CamelContext, CamelError> {
918        let registry = self
919            .registry
920            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
921        let languages = self
922            .languages
923            .unwrap_or_else(CamelContext::built_in_languages);
924        let simple_with_resolver: Arc<dyn Language> = Arc::new(
925            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
926                let languages = Arc::clone(&languages);
927                move |name| {
928                    languages
929                        .lock()
930                        .ok()
931                        .and_then(|registry| registry.get(name).cloned())
932                }
933            })),
934        );
935        languages
936            .lock()
937            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
938            .insert("simple".to_string(), simple_with_resolver);
939        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
940        let platform_service = self
941            .platform_service
942            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
943        let health_registry = self.health_registry.unwrap_or_else(|| {
944            Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
945        });
946
947        let (controller, actor_join, supervision_join) =
948            if let Some(config) = self.supervision_config {
949                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
950                let mut controller_impl = if let Some(ref beans) = self.beans {
951                    DefaultRouteController::with_languages_and_beans(
952                        Arc::clone(&registry),
953                        Arc::clone(&languages),
954                        Arc::clone(&platform_service),
955                        Arc::clone(beans),
956                    )
957                } else {
958                    DefaultRouteController::with_languages(
959                        Arc::clone(&registry),
960                        Arc::clone(&languages),
961                        Arc::clone(&platform_service),
962                    )
963                };
964                if let Some(invoker) = self.function_invoker.clone() {
965                    controller_impl = controller_impl.with_function_invoker(invoker);
966                }
967                controller_impl.set_health_registry(Arc::clone(&health_registry));
968                controller_impl.set_crash_notifier(crash_tx);
969                let (controller, actor_join) = spawn_controller_actor(controller_impl);
970                let supervision_join = spawn_supervision_task(
971                    controller.clone(),
972                    config,
973                    Some(Arc::clone(&metrics)),
974                    crash_rx,
975                );
976                (controller, actor_join, Some(supervision_join))
977            } else {
978                let mut controller_impl = if let Some(ref beans) = self.beans {
979                    DefaultRouteController::with_languages_and_beans(
980                        Arc::clone(&registry),
981                        Arc::clone(&languages),
982                        Arc::clone(&platform_service),
983                        Arc::clone(beans),
984                    )
985                } else {
986                    DefaultRouteController::with_languages(
987                        Arc::clone(&registry),
988                        Arc::clone(&languages),
989                        Arc::clone(&platform_service),
990                    )
991                };
992                if let Some(invoker) = self.function_invoker.clone() {
993                    controller_impl = controller_impl.with_function_invoker(invoker);
994                }
995                controller_impl.set_health_registry(Arc::clone(&health_registry));
996                let (controller, actor_join) = spawn_controller_actor(controller_impl);
997                (controller, actor_join, None)
998            };
999
1000        let store = self.runtime_store.unwrap_or_default();
1001        let runtime = CamelContext::build_runtime(
1002            controller.clone(),
1003            store,
1004            self.execution_factory,
1005            Arc::clone(&health_registry),
1006        );
1007        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
1008        controller
1009            .try_set_runtime_handle(runtime_handle)
1010            .expect("controller actor mailbox should accept initial runtime handle"); // allow-unwrap
1011
1012        let template_registry = self
1013            .template_registry
1014            .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
1015
1016        Ok(CamelContext {
1017            registry,
1018            route_controller: controller,
1019            _actor_join: actor_join,
1020            supervision_join,
1021            runtime,
1022            cancel_token: CancellationToken::new(),
1023            metrics,
1024            platform_service,
1025            languages,
1026            shutdown_timeout: self.shutdown_timeout,
1027            services: self.lifecycle_services,
1028            health_registry,
1029            component_configs: HashMap::new(),
1030            function_invoker: self.function_invoker,
1031            template_registry,
1032        })
1033    }
1034}
1035
1036impl Default for CamelContextBuilder {
1037    fn default() -> Self {
1038        Self::new()
1039    }
1040}
1041
1042#[cfg(test)]
1043#[path = "context_tests.rs"]
1044mod context_tests;