Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, FunctionInvoker, HealthReport, HealthStatus, Lifecycle, MetricsCollector,
12    NoOpMetrics, NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate,
13    RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::lifecycle::ports::RuntimeExecutionPort;
29use crate::shared::components::domain::Registry;
30use crate::shared::observability::domain::TracerConfig;
31
32static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
33
34type ExecutionFactory =
35    Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
36
37pub struct CamelContextBuilder {
38    registry: Option<Arc<std::sync::Mutex<Registry>>>,
39    languages: Option<SharedLanguageRegistry>,
40    metrics: Option<Arc<dyn MetricsCollector>>,
41    // Platform ports
42    platform_service: Option<Arc<dyn PlatformService>>,
43    supervision_config: Option<SupervisionConfig>,
44    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
45    shutdown_timeout: std::time::Duration,
46    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
47    function_invoker: Option<Arc<dyn FunctionInvoker>>,
48    lifecycle_services: Vec<Box<dyn Lifecycle>>,
49    execution_factory: Option<ExecutionFactory>,
50}
51
52/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
53///
54/// # Lifecycle
55///
56/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
57/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
58/// stopped context is not supported — create a new instance instead.
59pub struct CamelContext {
60    registry: Arc<std::sync::Mutex<Registry>>,
61    route_controller: RouteControllerHandle,
62    _actor_join: tokio::task::JoinHandle<()>,
63    supervision_join: Option<tokio::task::JoinHandle<()>>,
64    runtime: Arc<RuntimeBus>,
65    cancel_token: CancellationToken,
66    metrics: Arc<dyn MetricsCollector>,
67    // Platform ports
68    platform_service: Arc<dyn PlatformService>,
69    languages: SharedLanguageRegistry,
70    shutdown_timeout: std::time::Duration,
71    services: Vec<Box<dyn Lifecycle>>,
72    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
73    function_invoker: Option<Arc<dyn FunctionInvoker>>,
74}
75
76/// Opaque handle for runtime side-effect execution operations.
77///
78/// This intentionally does not expose direct lifecycle mutation APIs to callers.
79#[derive(Clone)]
80pub struct RuntimeExecutionHandle {
81    controller: RouteControllerHandle,
82    runtime: Arc<RuntimeBus>,
83    function_invoker: Option<Arc<dyn FunctionInvoker>>,
84}
85
86impl RuntimeExecutionHandle {
87    pub(crate) async fn add_route_definition(
88        &self,
89        definition: RouteDefinition,
90    ) -> Result<(), CamelError> {
91        use crate::lifecycle::ports::RouteRegistrationPort;
92        self.runtime
93            .register_route(definition)
94            .await
95            .map_err(Into::into)
96    }
97
98    pub(crate) async fn compile_route_definition(
99        &self,
100        definition: RouteDefinition,
101    ) -> Result<camel_api::BoxProcessor, CamelError> {
102        self.controller.compile_route_definition(definition).await
103    }
104
105    pub(crate) async fn compile_route_definition_with_generation(
106        &self,
107        definition: RouteDefinition,
108        generation: u64,
109    ) -> Result<camel_api::BoxProcessor, CamelError> {
110        self.controller
111            .compile_route_definition_with_generation(definition, generation)
112            .await
113    }
114
115    pub(crate) async fn prepare_route_definition_with_generation(
116        &self,
117        definition: RouteDefinition,
118        generation: u64,
119    ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
120        self.controller
121            .prepare_route_definition_with_generation(definition, generation)
122            .await
123    }
124
125    pub(crate) async fn insert_prepared_route(
126        &self,
127        prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
128    ) -> Result<(), CamelError> {
129        self.controller.insert_prepared_route(prepared).await
130    }
131
132    pub(crate) async fn remove_route_preserving_functions(
133        &self,
134        route_id: String,
135    ) -> Result<(), CamelError> {
136        self.controller
137            .remove_route_preserving_functions(route_id)
138            .await
139    }
140
141    pub(crate) async fn register_route_aggregate(
142        &self,
143        route_id: String,
144    ) -> Result<(), CamelError> {
145        self.runtime.register_aggregate_only(route_id).await
146    }
147
148    pub(crate) async fn swap_route_pipeline(
149        &self,
150        route_id: &str,
151        pipeline: camel_api::BoxProcessor,
152    ) -> Result<(), CamelError> {
153        self.controller.swap_pipeline(route_id, pipeline).await
154    }
155
156    pub(crate) async fn execute_runtime_command(
157        &self,
158        cmd: camel_api::RuntimeCommand,
159    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
160        self.runtime.execute(cmd).await
161    }
162
163    pub(crate) async fn runtime_route_status(
164        &self,
165        route_id: &str,
166    ) -> Result<Option<String>, CamelError> {
167        match self
168            .runtime
169            .ask(camel_api::RuntimeQuery::GetRouteStatus {
170                route_id: route_id.to_string(),
171            })
172            .await
173        {
174            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
175            Ok(_) => Err(CamelError::RouteError(
176                "unexpected runtime query response for route status".to_string(),
177            )),
178            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
179            Err(err) => Err(err),
180        }
181    }
182
183    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
184        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
185            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
186            Ok(_) => Err(CamelError::RouteError(
187                "unexpected runtime query response for route listing".to_string(),
188            )),
189            Err(err) => Err(err),
190        }
191    }
192
193    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
194        self.controller.route_source_hash(route_id).await
195    }
196
197    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
198        if !self.controller.route_exists(route_id).await? {
199            return Err(CamelError::RouteError(format!(
200                "Route '{}' not found",
201                route_id
202            )));
203        }
204        Ok(self
205            .controller
206            .in_flight_count(route_id)
207            .await?
208            .unwrap_or(0))
209    }
210
211    pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
212        self.function_invoker.clone()
213    }
214
215    #[cfg(test)]
216    pub(crate) async fn force_start_route_for_test(
217        &self,
218        route_id: &str,
219    ) -> Result<(), CamelError> {
220        self.controller.start_route(route_id).await
221    }
222
223    pub async fn controller_route_count_for_test(&self) -> usize {
224        self.controller.route_count().await.unwrap_or(0)
225    }
226}
227
228impl CamelContext {
229    fn built_in_languages() -> SharedLanguageRegistry {
230        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
231        languages.insert(
232            "simple".to_string(),
233            Arc::new(camel_language_simple::SimpleLanguage::new()),
234        );
235        #[cfg(feature = "lang-js")]
236        {
237            let js_lang = camel_language_js::JsLanguage::new();
238            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
239            languages.insert("javascript".to_string(), Arc::new(js_lang));
240        }
241        #[cfg(feature = "lang-rhai")]
242        {
243            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
244            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
245        }
246        #[cfg(feature = "lang-jsonpath")]
247        {
248            languages.insert(
249                "jsonpath".to_string(),
250                Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
251            );
252        }
253        #[cfg(feature = "lang-xpath")]
254        {
255            languages.insert(
256                "xpath".to_string(),
257                Arc::new(camel_language_xpath::XPathLanguage::new()),
258            );
259        }
260        Arc::new(std::sync::Mutex::new(languages))
261    }
262
263    fn build_runtime(
264        controller: RouteControllerHandle,
265        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
266        execution_factory: Option<ExecutionFactory>,
267    ) -> Arc<RuntimeBus> {
268        let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
269            factory(controller.clone())
270        } else {
271            Arc::new(RuntimeExecutionAdapter::new(controller))
272        };
273        Arc::new(
274            RuntimeBus::new(
275                Arc::new(store.clone()),
276                Arc::new(store.clone()),
277                Arc::new(store.clone()),
278                Arc::new(store.clone()),
279            )
280            .with_uow(Arc::new(store))
281            .with_execution(execution),
282        )
283    }
284
285    pub fn builder() -> CamelContextBuilder {
286        CamelContextBuilder::new()
287    }
288
289    /// Set a global error handler applied to all routes without a per-route handler.
290    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
291        let _ = self.route_controller.set_error_handler(config).await;
292    }
293
294    /// Enable or disable tracing globally.
295    pub async fn set_tracing(&mut self, enabled: bool) {
296        let _ = self
297            .route_controller
298            .set_tracer_config(TracerConfig {
299                enabled,
300                ..Default::default()
301            })
302            .await;
303    }
304
305    /// Configure tracing with full config.
306    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
307        // Inject metrics collector if not already set
308        let config = if config.metrics_collector.is_none() {
309            TracerConfig {
310                metrics_collector: Some(Arc::clone(&self.metrics)),
311                ..config
312            }
313        } else {
314            config
315        };
316
317        let _ = self.route_controller.set_tracer_config(config).await;
318    }
319
320    /// Builder-style: enable tracing with default config.
321    pub async fn with_tracing(mut self) -> Self {
322        self.set_tracing(true).await;
323        self
324    }
325
326    /// Builder-style: configure tracing with custom config.
327    /// Note: tracing subscriber initialization (stdout/file output) is handled
328    /// separately via init_tracing_subscriber (called in camel-config bridge).
329    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
330        self.set_tracer_config(config).await;
331        self
332    }
333
334    /// Register a lifecycle service (Apache Camel: addService pattern)
335    ///
336    /// For services exposing `as_function_invoker()`, the invoker is propagated
337    /// to the route controller so that subsequent route definitions with function
338    /// steps work correctly.
339    ///
340    /// Prefer [`CamelContextBuilder::with_lifecycle`] when possible, which wires
341    /// the invoker at build time before any routes are added.
342    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
343        if let Some(collector) = service.as_metrics_collector() {
344            self.metrics = collector;
345        }
346        if let Some(invoker) = service.as_function_invoker() {
347            self.function_invoker = Some(invoker.clone());
348            if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
349                tracing::warn!("Failed to propagate function invoker to route controller: {e}");
350            }
351        }
352
353        self.services.push(Box::new(service));
354        self
355    }
356
357    /// Register a component with this context.
358    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
359        info!(scheme = component.scheme(), "Registering component");
360        self.registry
361            .lock()
362            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
363            .register(Arc::new(component));
364    }
365
366    /// Register a language with this context, keyed by name.
367    ///
368    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
369    /// with the same name is already registered. Use
370    /// [`resolve_language`](Self::resolve_language) to check before
371    /// registering, or choose a distinct name.
372    pub fn register_language(
373        &mut self,
374        name: impl Into<String>,
375        lang: Box<dyn Language>,
376    ) -> Result<(), LanguageRegistryError> {
377        let name = name.into();
378        let mut languages = self
379            .languages
380            .lock()
381            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
382        if languages.contains_key(&name) {
383            return Err(LanguageRegistryError::AlreadyRegistered { name });
384        }
385        languages.insert(name, Arc::from(lang));
386        Ok(())
387    }
388
389    /// Resolve a language by name. Returns `None` if not registered.
390    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
391        let languages = self
392            .languages
393            .lock()
394            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
395        languages.get(name).cloned()
396    }
397
398    /// Add a route definition to this context.
399    ///
400    /// The route must have an ID. Steps are resolved immediately using registered components.
401    pub async fn add_route_definition(
402        &self,
403        definition: RouteDefinition,
404    ) -> Result<(), CamelError> {
405        use crate::lifecycle::ports::RouteRegistrationPort;
406        info!(
407            from = definition.from_uri(),
408            route_id = %definition.route_id(),
409            "Adding route definition"
410        );
411        self.runtime
412            .register_route(definition)
413            .await
414            .map_err(Into::into)
415    }
416
417    fn next_context_command_id(op: &str, route_id: &str) -> String {
418        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
419        format!("context:{op}:{route_id}:{seq}")
420    }
421
422    /// Access the component registry.
423    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
424        self.registry
425            .lock()
426            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
427    }
428
429    /// Access the shared component registry Arc.
430    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
431        Arc::clone(&self.registry)
432    }
433
434    /// Get runtime execution handle for file-watcher integrations.
435    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
436        RuntimeExecutionHandle {
437            controller: self.route_controller.clone(),
438            runtime: Arc::clone(&self.runtime),
439            function_invoker: self.function_invoker.clone(),
440        }
441    }
442
443    /// Get the metrics collector.
444    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
445        Arc::clone(&self.metrics)
446    }
447
448    /// Get the platform service.
449    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
450        Arc::clone(&self.platform_service)
451    }
452
453    /// Get the readiness gate port.
454    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
455        self.platform_service.readiness_gate()
456    }
457
458    /// Get the platform identity.
459    pub fn platform_identity(&self) -> PlatformIdentity {
460        self.platform_service.identity()
461    }
462
463    /// Get the leadership service port.
464    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
465        self.platform_service.leadership()
466    }
467
468    /// Get runtime command/query bus handle.
469    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
470        self.runtime.clone()
471    }
472
473    /// Build a producer context wired to this runtime.
474    pub fn producer_context(&self) -> camel_api::ProducerContext {
475        camel_api::ProducerContext::new().with_runtime(self.runtime())
476    }
477
478    /// Query route status via runtime read-model.
479    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
480        match self
481            .runtime()
482            .ask(camel_api::RuntimeQuery::GetRouteStatus {
483                route_id: route_id.to_string(),
484            })
485            .await
486        {
487            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
488            Ok(_) => Err(CamelError::RouteError(
489                "unexpected runtime query response for route status".to_string(),
490            )),
491            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
492            Err(err) => Err(err),
493        }
494    }
495
496    /// Start all routes. Each route's consumer will begin producing exchanges.
497    ///
498    /// Only routes with `auto_startup == true` will be started, in order of their
499    /// `startup_order` (lower values start first).
500    pub async fn start(&mut self) -> Result<(), CamelError> {
501        info!("Starting CamelContext");
502
503        // Start lifecycle services first
504        for (i, service) in self.services.iter_mut().enumerate() {
505            info!("Starting service: {}", service.name());
506            if let Err(e) = service.start().await {
507                // Rollback: stop already started services in reverse order
508                warn!(
509                    "Service {} failed to start, rolling back {} services",
510                    service.name(),
511                    i
512                );
513                for j in (0..i).rev() {
514                    if let Err(rollback_err) = self.services[j].stop().await {
515                        warn!(
516                            "Failed to stop service {} during rollback: {}",
517                            self.services[j].name(),
518                            rollback_err
519                        );
520                    }
521                }
522                return Err(e);
523            }
524        }
525
526        // Then start routes via runtime command bus (aggregate-first),
527        // preserving route controller startup ordering metadata.
528        let route_ids = self.route_controller.auto_startup_route_ids().await?;
529        for route_id in route_ids {
530            self.runtime
531                .execute(camel_api::RuntimeCommand::StartRoute {
532                    route_id: route_id.clone(),
533                    command_id: Self::next_context_command_id("start", &route_id),
534                    causation_id: None,
535                })
536                .await?;
537        }
538
539        info!("CamelContext started");
540        Ok(())
541    }
542
543    /// Graceful shutdown with default 30-second timeout.
544    pub async fn stop(&mut self) -> Result<(), CamelError> {
545        self.stop_timeout(self.shutdown_timeout).await
546    }
547
548    /// Graceful shutdown with custom timeout.
549    ///
550    /// Note: The timeout parameter is currently not used directly; the RouteController
551    /// manages its own shutdown timeout. This may change in a future version.
552    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
553        info!("Stopping CamelContext");
554
555        // Signal cancellation (for any legacy code that might use it)
556        self.cancel_token.cancel();
557        if let Some(join) = self.supervision_join.take() {
558            join.abort();
559        }
560
561        // Stop all routes via runtime command bus (aggregate-first),
562        // preserving route controller shutdown ordering metadata.
563        let route_ids = self.route_controller.shutdown_route_ids().await?;
564        for route_id in route_ids {
565            if let Err(err) = self
566                .runtime
567                .execute(camel_api::RuntimeCommand::StopRoute {
568                    route_id: route_id.clone(),
569                    command_id: Self::next_context_command_id("stop", &route_id),
570                    causation_id: None,
571                })
572                .await
573            {
574                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
575            }
576        }
577
578        // Then stop lifecycle services in reverse insertion order (LIFO)
579        // Continue stopping all services even if some fail
580        let mut first_error = None;
581        for service in self.services.iter_mut().rev() {
582            info!("Stopping service: {}", service.name());
583            if let Err(e) = service.stop().await {
584                warn!("Service {} failed to stop: {}", service.name(), e);
585                if first_error.is_none() {
586                    first_error = Some(e);
587                }
588            }
589        }
590
591        info!("CamelContext stopped");
592
593        if let Some(e) = first_error {
594            Err(e)
595        } else {
596            Ok(())
597        }
598    }
599
600    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
601    pub fn shutdown_timeout(&self) -> std::time::Duration {
602        self.shutdown_timeout
603    }
604
605    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
606    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
607        self.shutdown_timeout = timeout;
608    }
609
610    /// Immediate abort — kills all tasks without draining.
611    pub async fn abort(&mut self) {
612        self.cancel_token.cancel();
613        if let Some(join) = self.supervision_join.take() {
614            join.abort();
615        }
616        let route_ids = self
617            .route_controller
618            .shutdown_route_ids()
619            .await
620            .unwrap_or_default();
621        for route_id in route_ids {
622            let _ = self
623                .runtime
624                .execute(camel_api::RuntimeCommand::StopRoute {
625                    route_id: route_id.clone(),
626                    command_id: Self::next_context_command_id("abort-stop", &route_id),
627                    causation_id: None,
628                })
629                .await;
630        }
631
632        for service in self.services.iter_mut().rev() {
633            let name = service.name().to_string();
634            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
635                Ok(Ok(())) => info!("Aborted service: {}", name),
636                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
637                Err(_) => warn!("Service {} timed out during abort (5s)", name),
638            }
639        }
640    }
641
642    /// Check health status of all registered services.
643    pub fn health_check(&self) -> HealthReport {
644        let services: Vec<ServiceHealth> = self
645            .services
646            .iter()
647            .map(|s| ServiceHealth {
648                name: s.name().to_string(),
649                status: s.status(),
650            })
651            .collect();
652
653        let has_failed = services.iter().any(|s| s.status == ServiceStatus::Failed);
654        let has_stopped = services.iter().any(|s| s.status == ServiceStatus::Stopped);
655        let status = if has_failed {
656            HealthStatus::Unhealthy
657        } else if has_stopped {
658            HealthStatus::Degraded
659        } else {
660            HealthStatus::Healthy
661        };
662
663        HealthReport {
664            status,
665            services,
666            ..Default::default()
667        }
668    }
669
670    /// Store a component config. Overwrites any previously stored config of the same type.
671    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
672        self.component_configs
673            .insert(TypeId::of::<T>(), Box::new(config));
674    }
675
676    /// Retrieve a stored component config by type. Returns None if not stored.
677    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
678        self.component_configs
679            .get(&TypeId::of::<T>())
680            .and_then(|b| b.downcast_ref::<T>())
681    }
682}
683
684impl ComponentRegistrar for CamelContext {
685    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
686        self.registry
687            .lock()
688            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
689            .register(component);
690    }
691}
692
693impl ComponentContext for CamelContext {
694    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
695        self.registry.lock().ok()?.get(scheme)
696    }
697
698    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
699        self.languages.lock().ok()?.get(name).cloned()
700    }
701
702    fn metrics(&self) -> Arc<dyn MetricsCollector> {
703        Arc::clone(&self.metrics)
704    }
705
706    fn platform_service(&self) -> Arc<dyn PlatformService> {
707        Arc::clone(&self.platform_service)
708    }
709}
710
711impl CamelContextBuilder {
712    pub fn new() -> Self {
713        Self {
714            registry: None,
715            languages: None,
716            metrics: None,
717            platform_service: None,
718            supervision_config: None,
719            runtime_store: None,
720            shutdown_timeout: std::time::Duration::from_secs(30),
721            beans: None,
722            function_invoker: None,
723            lifecycle_services: Vec::new(),
724            execution_factory: None,
725        }
726    }
727
728    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
729        self.registry = Some(registry);
730        self
731    }
732
733    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
734        self.languages = Some(languages);
735        self
736    }
737
738    pub fn with_execution_factory(
739        mut self,
740        factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
741    ) -> Self {
742        self.execution_factory = Some(Arc::new(factory));
743        self
744    }
745
746    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
747        self.metrics = Some(metrics);
748        self
749    }
750
751    /// Set a custom platform service.
752    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
753        self.platform_service = Some(platform_service);
754        self
755    }
756
757    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
758        self.supervision_config = Some(config);
759        self
760    }
761
762    pub fn runtime_store(
763        mut self,
764        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
765    ) -> Self {
766        self.runtime_store = Some(store);
767        self
768    }
769
770    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
771        self.shutdown_timeout = timeout;
772        self
773    }
774
775    /// Inject a shared `BeanRegistry` for bean resolution across routes.
776    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
777        self.beans = Some(beans);
778        self
779    }
780
781    /// Register a lifecycle service (e.g., FunctionRuntimeService) at builder time.
782    ///
783    /// This is the recommended path for services that need to be wired into the
784    /// route controller before any routes are added. The function invoker (if any)
785    /// is extracted and passed to the `DefaultRouteController` during `build()`.
786    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
787        if let Some(collector) = service.as_metrics_collector() {
788            self.metrics = Some(collector);
789        }
790        if let Some(invoker) = service.as_function_invoker() {
791            self.function_invoker = Some(invoker);
792        }
793        self.lifecycle_services.push(Box::new(service));
794        self
795    }
796
797    pub async fn build(self) -> Result<CamelContext, CamelError> {
798        let registry = self
799            .registry
800            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
801        let languages = self
802            .languages
803            .unwrap_or_else(CamelContext::built_in_languages);
804        let simple_with_resolver: Arc<dyn Language> = Arc::new(
805            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
806                let languages = Arc::clone(&languages);
807                move |name| {
808                    languages
809                        .lock()
810                        .ok()
811                        .and_then(|registry| registry.get(name).cloned())
812                }
813            })),
814        );
815        languages
816            .lock()
817            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
818            .insert("simple".to_string(), simple_with_resolver);
819        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
820        let platform_service = self
821            .platform_service
822            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
823
824        let (controller, actor_join, supervision_join) =
825            if let Some(config) = self.supervision_config {
826                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
827                let mut controller_impl = if let Some(ref beans) = self.beans {
828                    DefaultRouteController::with_languages_and_beans(
829                        Arc::clone(&registry),
830                        Arc::clone(&languages),
831                        Arc::clone(&platform_service),
832                        Arc::clone(beans),
833                    )
834                } else {
835                    DefaultRouteController::with_languages(
836                        Arc::clone(&registry),
837                        Arc::clone(&languages),
838                        Arc::clone(&platform_service),
839                    )
840                };
841                if let Some(invoker) = self.function_invoker.clone() {
842                    controller_impl = controller_impl.with_function_invoker(invoker);
843                }
844                controller_impl.set_crash_notifier(crash_tx);
845                let (controller, actor_join) = spawn_controller_actor(controller_impl);
846                let supervision_join = spawn_supervision_task(
847                    controller.clone(),
848                    config,
849                    Some(Arc::clone(&metrics)),
850                    crash_rx,
851                );
852                (controller, actor_join, Some(supervision_join))
853            } else {
854                let mut controller_impl = if let Some(ref beans) = self.beans {
855                    DefaultRouteController::with_languages_and_beans(
856                        Arc::clone(&registry),
857                        Arc::clone(&languages),
858                        Arc::clone(&platform_service),
859                        Arc::clone(beans),
860                    )
861                } else {
862                    DefaultRouteController::with_languages(
863                        Arc::clone(&registry),
864                        Arc::clone(&languages),
865                        Arc::clone(&platform_service),
866                    )
867                };
868                if let Some(invoker) = self.function_invoker.clone() {
869                    controller_impl = controller_impl.with_function_invoker(invoker);
870                }
871                let (controller, actor_join) = spawn_controller_actor(controller_impl);
872                (controller, actor_join, None)
873            };
874
875        let store = self.runtime_store.unwrap_or_default();
876        let runtime =
877            CamelContext::build_runtime(controller.clone(), store, self.execution_factory);
878        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
879        controller
880            .try_set_runtime_handle(runtime_handle)
881            .expect("controller actor mailbox should accept initial runtime handle"); // allow-unwrap
882
883        Ok(CamelContext {
884            registry,
885            route_controller: controller,
886            _actor_join: actor_join,
887            supervision_join,
888            runtime,
889            cancel_token: CancellationToken::new(),
890            metrics,
891            platform_service,
892            languages,
893            shutdown_timeout: self.shutdown_timeout,
894            services: self.lifecycle_services,
895            component_configs: HashMap::new(),
896            function_invoker: self.function_invoker,
897        })
898    }
899}
900
901impl Default for CamelContextBuilder {
902    fn default() -> Self {
903        Self::new()
904    }
905}
906
907#[cfg(test)]
908#[path = "context_tests.rs"]
909mod context_tests;