Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, FunctionInvoker, HealthReport, HealthStatus, Lifecycle, MetricsCollector,
12    NoOpMetrics, NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate,
13    RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34    registry: Option<Arc<std::sync::Mutex<Registry>>>,
35    languages: Option<SharedLanguageRegistry>,
36    metrics: Option<Arc<dyn MetricsCollector>>,
37    // Platform ports
38    platform_service: Option<Arc<dyn PlatformService>>,
39    supervision_config: Option<SupervisionConfig>,
40    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41    shutdown_timeout: std::time::Duration,
42    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
43    function_invoker: Option<Arc<dyn FunctionInvoker>>,
44    lifecycle_services: Vec<Box<dyn Lifecycle>>,
45}
46
47/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
48///
49/// # Lifecycle
50///
51/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
52/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
53/// stopped context is not supported — create a new instance instead.
54pub struct CamelContext {
55    registry: Arc<std::sync::Mutex<Registry>>,
56    route_controller: RouteControllerHandle,
57    _actor_join: tokio::task::JoinHandle<()>,
58    supervision_join: Option<tokio::task::JoinHandle<()>>,
59    runtime: Arc<RuntimeBus>,
60    cancel_token: CancellationToken,
61    metrics: Arc<dyn MetricsCollector>,
62    // Platform ports
63    platform_service: Arc<dyn PlatformService>,
64    languages: SharedLanguageRegistry,
65    shutdown_timeout: std::time::Duration,
66    services: Vec<Box<dyn Lifecycle>>,
67    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68    function_invoker: Option<Arc<dyn FunctionInvoker>>,
69}
70
71/// Opaque handle for runtime side-effect execution operations.
72///
73/// This intentionally does not expose direct lifecycle mutation APIs to callers.
74#[derive(Clone)]
75pub struct RuntimeExecutionHandle {
76    controller: RouteControllerHandle,
77    runtime: Arc<RuntimeBus>,
78    function_invoker: Option<Arc<dyn FunctionInvoker>>,
79}
80
81impl RuntimeExecutionHandle {
82    pub(crate) async fn add_route_definition(
83        &self,
84        definition: RouteDefinition,
85    ) -> Result<(), CamelError> {
86        use crate::lifecycle::ports::RouteRegistrationPort;
87        self.runtime.register_route(definition).await
88    }
89
90    pub(crate) async fn compile_route_definition(
91        &self,
92        definition: RouteDefinition,
93    ) -> Result<camel_api::BoxProcessor, CamelError> {
94        self.controller.compile_route_definition(definition).await
95    }
96
97    pub(crate) async fn compile_route_definition_with_generation(
98        &self,
99        definition: RouteDefinition,
100        generation: u64,
101    ) -> Result<camel_api::BoxProcessor, CamelError> {
102        self.controller
103            .compile_route_definition_with_generation(definition, generation)
104            .await
105    }
106
107    pub(crate) async fn prepare_route_definition_with_generation(
108        &self,
109        definition: RouteDefinition,
110        generation: u64,
111    ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
112        self.controller
113            .prepare_route_definition_with_generation(definition, generation)
114            .await
115    }
116
117    pub(crate) async fn insert_prepared_route(
118        &self,
119        prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
120    ) -> Result<(), CamelError> {
121        self.controller.insert_prepared_route(prepared).await
122    }
123
124    pub(crate) async fn remove_route_preserving_functions(
125        &self,
126        route_id: String,
127    ) -> Result<(), CamelError> {
128        self.controller
129            .remove_route_preserving_functions(route_id)
130            .await
131    }
132
133    pub(crate) async fn register_route_aggregate(
134        &self,
135        route_id: String,
136    ) -> Result<(), CamelError> {
137        self.runtime.register_aggregate_only(route_id).await
138    }
139
140    pub(crate) async fn swap_route_pipeline(
141        &self,
142        route_id: &str,
143        pipeline: camel_api::BoxProcessor,
144    ) -> Result<(), CamelError> {
145        self.controller.swap_pipeline(route_id, pipeline).await
146    }
147
148    pub(crate) async fn execute_runtime_command(
149        &self,
150        cmd: camel_api::RuntimeCommand,
151    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
152        self.runtime.execute(cmd).await
153    }
154
155    pub(crate) async fn runtime_route_status(
156        &self,
157        route_id: &str,
158    ) -> Result<Option<String>, CamelError> {
159        match self
160            .runtime
161            .ask(camel_api::RuntimeQuery::GetRouteStatus {
162                route_id: route_id.to_string(),
163            })
164            .await
165        {
166            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
167            Ok(_) => Err(CamelError::RouteError(
168                "unexpected runtime query response for route status".to_string(),
169            )),
170            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
171            Err(err) => Err(err),
172        }
173    }
174
175    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
176        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
177            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
178            Ok(_) => Err(CamelError::RouteError(
179                "unexpected runtime query response for route listing".to_string(),
180            )),
181            Err(err) => Err(err),
182        }
183    }
184
185    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
186        self.controller.route_source_hash(route_id).await
187    }
188
189    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
190        if !self.controller.route_exists(route_id).await? {
191            return Err(CamelError::RouteError(format!(
192                "Route '{}' not found",
193                route_id
194            )));
195        }
196        Ok(self
197            .controller
198            .in_flight_count(route_id)
199            .await?
200            .unwrap_or(0))
201    }
202
203    pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
204        self.function_invoker.clone()
205    }
206
207    #[cfg(test)]
208    pub(crate) async fn force_start_route_for_test(
209        &self,
210        route_id: &str,
211    ) -> Result<(), CamelError> {
212        self.controller.start_route(route_id).await
213    }
214
215    pub async fn controller_route_count_for_test(&self) -> usize {
216        self.controller.route_count().await.unwrap_or(0)
217    }
218}
219
220impl CamelContext {
221    fn built_in_languages() -> SharedLanguageRegistry {
222        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
223        languages.insert(
224            "simple".to_string(),
225            Arc::new(camel_language_simple::SimpleLanguage::new()),
226        );
227        #[cfg(feature = "lang-js")]
228        {
229            let js_lang = camel_language_js::JsLanguage::new();
230            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
231            languages.insert("javascript".to_string(), Arc::new(js_lang));
232        }
233        #[cfg(feature = "lang-rhai")]
234        {
235            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
236            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
237        }
238        #[cfg(feature = "lang-jsonpath")]
239        {
240            languages.insert(
241                "jsonpath".to_string(),
242                Arc::new(camel_language_jsonpath::JsonPathLanguage),
243            );
244        }
245        #[cfg(feature = "lang-xpath")]
246        {
247            languages.insert(
248                "xpath".to_string(),
249                Arc::new(camel_language_xpath::XPathLanguage),
250            );
251        }
252        Arc::new(std::sync::Mutex::new(languages))
253    }
254
255    fn build_runtime(
256        controller: RouteControllerHandle,
257        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
258    ) -> Arc<RuntimeBus> {
259        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
260        Arc::new(
261            RuntimeBus::new(
262                Arc::new(store.clone()),
263                Arc::new(store.clone()),
264                Arc::new(store.clone()),
265                Arc::new(store.clone()),
266            )
267            .with_uow(Arc::new(store))
268            .with_execution(execution),
269        )
270    }
271
272    pub fn builder() -> CamelContextBuilder {
273        CamelContextBuilder::new()
274    }
275
276    /// Set a global error handler applied to all routes without a per-route handler.
277    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
278        let _ = self.route_controller.set_error_handler(config).await;
279    }
280
281    /// Enable or disable tracing globally.
282    pub async fn set_tracing(&mut self, enabled: bool) {
283        let _ = self
284            .route_controller
285            .set_tracer_config(TracerConfig {
286                enabled,
287                ..Default::default()
288            })
289            .await;
290    }
291
292    /// Configure tracing with full config.
293    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
294        // Inject metrics collector if not already set
295        let config = if config.metrics_collector.is_none() {
296            TracerConfig {
297                metrics_collector: Some(Arc::clone(&self.metrics)),
298                ..config
299            }
300        } else {
301            config
302        };
303
304        let _ = self.route_controller.set_tracer_config(config).await;
305    }
306
307    /// Builder-style: enable tracing with default config.
308    pub async fn with_tracing(mut self) -> Self {
309        self.set_tracing(true).await;
310        self
311    }
312
313    /// Builder-style: configure tracing with custom config.
314    /// Note: tracing subscriber initialization (stdout/file output) is handled
315    /// separately via init_tracing_subscriber (called in camel-config bridge).
316    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
317        self.set_tracer_config(config).await;
318        self
319    }
320
321    /// Register a lifecycle service (Apache Camel: addService pattern)
322    ///
323    /// For services exposing `as_function_invoker()`, the invoker is propagated
324    /// to the route controller so that subsequent route definitions with function
325    /// steps work correctly.
326    ///
327    /// Prefer [`CamelContextBuilder::with_lifecycle`] when possible, which wires
328    /// the invoker at build time before any routes are added.
329    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
330        if let Some(collector) = service.as_metrics_collector() {
331            self.metrics = collector;
332        }
333        if let Some(invoker) = service.as_function_invoker() {
334            self.function_invoker = Some(invoker.clone());
335            if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
336                tracing::warn!("Failed to propagate function invoker to route controller: {e}");
337            }
338        }
339
340        self.services.push(Box::new(service));
341        self
342    }
343
344    /// Register a component with this context.
345    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
346        info!(scheme = component.scheme(), "Registering component");
347        self.registry
348            .lock()
349            .expect("mutex poisoned: another thread panicked while holding this lock")
350            .register(Arc::new(component));
351    }
352
353    /// Register a language with this context, keyed by name.
354    ///
355    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
356    /// with the same name is already registered. Use
357    /// [`resolve_language`](Self::resolve_language) to check before
358    /// registering, or choose a distinct name.
359    pub fn register_language(
360        &mut self,
361        name: impl Into<String>,
362        lang: Box<dyn Language>,
363    ) -> Result<(), LanguageRegistryError> {
364        let name = name.into();
365        let mut languages = self
366            .languages
367            .lock()
368            .expect("mutex poisoned: another thread panicked while holding this lock");
369        if languages.contains_key(&name) {
370            return Err(LanguageRegistryError::AlreadyRegistered { name });
371        }
372        languages.insert(name, Arc::from(lang));
373        Ok(())
374    }
375
376    /// Resolve a language by name. Returns `None` if not registered.
377    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
378        let languages = self
379            .languages
380            .lock()
381            .expect("mutex poisoned: another thread panicked while holding this lock");
382        languages.get(name).cloned()
383    }
384
385    /// Add a route definition to this context.
386    ///
387    /// The route must have an ID. Steps are resolved immediately using registered components.
388    pub async fn add_route_definition(
389        &self,
390        definition: RouteDefinition,
391    ) -> Result<(), CamelError> {
392        use crate::lifecycle::ports::RouteRegistrationPort;
393        info!(
394            from = definition.from_uri(),
395            route_id = %definition.route_id(),
396            "Adding route definition"
397        );
398        self.runtime.register_route(definition).await
399    }
400
401    fn next_context_command_id(op: &str, route_id: &str) -> String {
402        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
403        format!("context:{op}:{route_id}:{seq}")
404    }
405
406    /// Access the component registry.
407    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
408        self.registry
409            .lock()
410            .expect("mutex poisoned: another thread panicked while holding this lock")
411    }
412
413    /// Access the shared component registry Arc.
414    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
415        Arc::clone(&self.registry)
416    }
417
418    /// Get runtime execution handle for file-watcher integrations.
419    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
420        RuntimeExecutionHandle {
421            controller: self.route_controller.clone(),
422            runtime: Arc::clone(&self.runtime),
423            function_invoker: self.function_invoker.clone(),
424        }
425    }
426
427    /// Get the metrics collector.
428    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
429        Arc::clone(&self.metrics)
430    }
431
432    /// Get the platform service.
433    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
434        Arc::clone(&self.platform_service)
435    }
436
437    /// Get the readiness gate port.
438    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
439        self.platform_service.readiness_gate()
440    }
441
442    /// Get the platform identity.
443    pub fn platform_identity(&self) -> PlatformIdentity {
444        self.platform_service.identity()
445    }
446
447    /// Get the leadership service port.
448    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
449        self.platform_service.leadership()
450    }
451
452    /// Get runtime command/query bus handle.
453    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
454        self.runtime.clone()
455    }
456
457    /// Build a producer context wired to this runtime.
458    pub fn producer_context(&self) -> camel_api::ProducerContext {
459        camel_api::ProducerContext::new().with_runtime(self.runtime())
460    }
461
462    /// Query route status via runtime read-model.
463    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
464        match self
465            .runtime()
466            .ask(camel_api::RuntimeQuery::GetRouteStatus {
467                route_id: route_id.to_string(),
468            })
469            .await
470        {
471            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
472            Ok(_) => Err(CamelError::RouteError(
473                "unexpected runtime query response for route status".to_string(),
474            )),
475            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
476            Err(err) => Err(err),
477        }
478    }
479
480    /// Start all routes. Each route's consumer will begin producing exchanges.
481    ///
482    /// Only routes with `auto_startup == true` will be started, in order of their
483    /// `startup_order` (lower values start first).
484    pub async fn start(&mut self) -> Result<(), CamelError> {
485        info!("Starting CamelContext");
486
487        // Start lifecycle services first
488        for (i, service) in self.services.iter_mut().enumerate() {
489            info!("Starting service: {}", service.name());
490            if let Err(e) = service.start().await {
491                // Rollback: stop already started services in reverse order
492                warn!(
493                    "Service {} failed to start, rolling back {} services",
494                    service.name(),
495                    i
496                );
497                for j in (0..i).rev() {
498                    if let Err(rollback_err) = self.services[j].stop().await {
499                        warn!(
500                            "Failed to stop service {} during rollback: {}",
501                            self.services[j].name(),
502                            rollback_err
503                        );
504                    }
505                }
506                return Err(e);
507            }
508        }
509
510        // Then start routes via runtime command bus (aggregate-first),
511        // preserving route controller startup ordering metadata.
512        let route_ids = self.route_controller.auto_startup_route_ids().await?;
513        for route_id in route_ids {
514            self.runtime
515                .execute(camel_api::RuntimeCommand::StartRoute {
516                    route_id: route_id.clone(),
517                    command_id: Self::next_context_command_id("start", &route_id),
518                    causation_id: None,
519                })
520                .await?;
521        }
522
523        info!("CamelContext started");
524        Ok(())
525    }
526
527    /// Graceful shutdown with default 30-second timeout.
528    pub async fn stop(&mut self) -> Result<(), CamelError> {
529        self.stop_timeout(self.shutdown_timeout).await
530    }
531
532    /// Graceful shutdown with custom timeout.
533    ///
534    /// Note: The timeout parameter is currently not used directly; the RouteController
535    /// manages its own shutdown timeout. This may change in a future version.
536    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
537        info!("Stopping CamelContext");
538
539        // Signal cancellation (for any legacy code that might use it)
540        self.cancel_token.cancel();
541        if let Some(join) = self.supervision_join.take() {
542            join.abort();
543        }
544
545        // Stop all routes via runtime command bus (aggregate-first),
546        // preserving route controller shutdown ordering metadata.
547        let route_ids = self.route_controller.shutdown_route_ids().await?;
548        for route_id in route_ids {
549            if let Err(err) = self
550                .runtime
551                .execute(camel_api::RuntimeCommand::StopRoute {
552                    route_id: route_id.clone(),
553                    command_id: Self::next_context_command_id("stop", &route_id),
554                    causation_id: None,
555                })
556                .await
557            {
558                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
559            }
560        }
561
562        // Then stop lifecycle services in reverse insertion order (LIFO)
563        // Continue stopping all services even if some fail
564        let mut first_error = None;
565        for service in self.services.iter_mut().rev() {
566            info!("Stopping service: {}", service.name());
567            if let Err(e) = service.stop().await {
568                warn!("Service {} failed to stop: {}", service.name(), e);
569                if first_error.is_none() {
570                    first_error = Some(e);
571                }
572            }
573        }
574
575        info!("CamelContext stopped");
576
577        if let Some(e) = first_error {
578            Err(e)
579        } else {
580            Ok(())
581        }
582    }
583
584    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
585    pub fn shutdown_timeout(&self) -> std::time::Duration {
586        self.shutdown_timeout
587    }
588
589    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
590    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
591        self.shutdown_timeout = timeout;
592    }
593
594    /// Immediate abort — kills all tasks without draining.
595    pub async fn abort(&mut self) {
596        self.cancel_token.cancel();
597        if let Some(join) = self.supervision_join.take() {
598            join.abort();
599        }
600        let route_ids = self
601            .route_controller
602            .shutdown_route_ids()
603            .await
604            .unwrap_or_default();
605        for route_id in route_ids {
606            let _ = self
607                .runtime
608                .execute(camel_api::RuntimeCommand::StopRoute {
609                    route_id: route_id.clone(),
610                    command_id: Self::next_context_command_id("abort-stop", &route_id),
611                    causation_id: None,
612                })
613                .await;
614        }
615
616        for service in self.services.iter_mut().rev() {
617            let name = service.name().to_string();
618            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
619                Ok(Ok(())) => info!("Aborted service: {}", name),
620                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
621                Err(_) => warn!("Service {} timed out during abort (5s)", name),
622            }
623        }
624    }
625
626    /// Check health status of all registered services.
627    pub fn health_check(&self) -> HealthReport {
628        let services: Vec<ServiceHealth> = self
629            .services
630            .iter()
631            .map(|s| ServiceHealth {
632                name: s.name().to_string(),
633                status: s.status(),
634            })
635            .collect();
636
637        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
638            HealthStatus::Healthy
639        } else {
640            HealthStatus::Unhealthy
641        };
642
643        HealthReport {
644            status,
645            services,
646            ..Default::default()
647        }
648    }
649
650    /// Store a component config. Overwrites any previously stored config of the same type.
651    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
652        self.component_configs
653            .insert(TypeId::of::<T>(), Box::new(config));
654    }
655
656    /// Retrieve a stored component config by type. Returns None if not stored.
657    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
658        self.component_configs
659            .get(&TypeId::of::<T>())
660            .and_then(|b| b.downcast_ref::<T>())
661    }
662}
663
664impl ComponentRegistrar for CamelContext {
665    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
666        self.registry
667            .lock()
668            .expect("mutex poisoned: another thread panicked while holding this lock")
669            .register(component);
670    }
671}
672
673impl ComponentContext for CamelContext {
674    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
675        self.registry.lock().ok()?.get(scheme)
676    }
677
678    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
679        self.languages.lock().ok()?.get(name).cloned()
680    }
681
682    fn metrics(&self) -> Arc<dyn MetricsCollector> {
683        Arc::clone(&self.metrics)
684    }
685
686    fn platform_service(&self) -> Arc<dyn PlatformService> {
687        Arc::clone(&self.platform_service)
688    }
689}
690
691impl CamelContextBuilder {
692    pub fn new() -> Self {
693        Self {
694            registry: None,
695            languages: None,
696            metrics: None,
697            platform_service: None,
698            supervision_config: None,
699            runtime_store: None,
700            shutdown_timeout: std::time::Duration::from_secs(30),
701            beans: None,
702            function_invoker: None,
703            lifecycle_services: Vec::new(),
704        }
705    }
706
707    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
708        self.registry = Some(registry);
709        self
710    }
711
712    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
713        self.languages = Some(languages);
714        self
715    }
716
717    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
718        self.metrics = Some(metrics);
719        self
720    }
721
722    /// Set a custom platform service.
723    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
724        self.platform_service = Some(platform_service);
725        self
726    }
727
728    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
729        self.supervision_config = Some(config);
730        self
731    }
732
733    pub fn runtime_store(
734        mut self,
735        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
736    ) -> Self {
737        self.runtime_store = Some(store);
738        self
739    }
740
741    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
742        self.shutdown_timeout = timeout;
743        self
744    }
745
746    /// Inject a shared `BeanRegistry` for bean resolution across routes.
747    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
748        self.beans = Some(beans);
749        self
750    }
751
752    /// Register a lifecycle service (e.g., FunctionRuntimeService) at builder time.
753    ///
754    /// This is the recommended path for services that need to be wired into the
755    /// route controller before any routes are added. The function invoker (if any)
756    /// is extracted and passed to the `DefaultRouteController` during `build()`.
757    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
758        if let Some(collector) = service.as_metrics_collector() {
759            self.metrics = Some(collector);
760        }
761        if let Some(invoker) = service.as_function_invoker() {
762            self.function_invoker = Some(invoker);
763        }
764        self.lifecycle_services.push(Box::new(service));
765        self
766    }
767
768    pub async fn build(self) -> Result<CamelContext, CamelError> {
769        let registry = self
770            .registry
771            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
772        let languages = self
773            .languages
774            .unwrap_or_else(CamelContext::built_in_languages);
775        let simple_with_resolver: Arc<dyn Language> = Arc::new(
776            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
777                let languages = Arc::clone(&languages);
778                move |name| {
779                    languages
780                        .lock()
781                        .ok()
782                        .and_then(|registry| registry.get(name).cloned())
783                }
784            })),
785        );
786        languages
787            .lock()
788            .expect("mutex poisoned: another thread panicked while holding this lock")
789            .insert("simple".to_string(), simple_with_resolver);
790        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
791        let platform_service = self
792            .platform_service
793            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
794
795        let (controller, actor_join, supervision_join) =
796            if let Some(config) = self.supervision_config {
797                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
798                let mut controller_impl = if let Some(ref beans) = self.beans {
799                    DefaultRouteController::with_languages_and_beans(
800                        Arc::clone(&registry),
801                        Arc::clone(&languages),
802                        Arc::clone(&platform_service),
803                        Arc::clone(beans),
804                    )
805                } else {
806                    DefaultRouteController::with_languages(
807                        Arc::clone(&registry),
808                        Arc::clone(&languages),
809                        Arc::clone(&platform_service),
810                    )
811                };
812                if let Some(invoker) = self.function_invoker.clone() {
813                    controller_impl = controller_impl.with_function_invoker(invoker);
814                }
815                controller_impl.set_crash_notifier(crash_tx);
816                let (controller, actor_join) = spawn_controller_actor(controller_impl);
817                let supervision_join = spawn_supervision_task(
818                    controller.clone(),
819                    config,
820                    Some(Arc::clone(&metrics)),
821                    crash_rx,
822                );
823                (controller, actor_join, Some(supervision_join))
824            } else {
825                let mut controller_impl = if let Some(ref beans) = self.beans {
826                    DefaultRouteController::with_languages_and_beans(
827                        Arc::clone(&registry),
828                        Arc::clone(&languages),
829                        Arc::clone(&platform_service),
830                        Arc::clone(beans),
831                    )
832                } else {
833                    DefaultRouteController::with_languages(
834                        Arc::clone(&registry),
835                        Arc::clone(&languages),
836                        Arc::clone(&platform_service),
837                    )
838                };
839                if let Some(invoker) = self.function_invoker.clone() {
840                    controller_impl = controller_impl.with_function_invoker(invoker);
841                }
842                let (controller, actor_join) = spawn_controller_actor(controller_impl);
843                (controller, actor_join, None)
844            };
845
846        let store = self.runtime_store.unwrap_or_default();
847        let runtime = CamelContext::build_runtime(controller.clone(), store);
848        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
849        controller
850            .try_set_runtime_handle(runtime_handle)
851            .expect("controller actor mailbox should accept initial runtime handle");
852
853        Ok(CamelContext {
854            registry,
855            route_controller: controller,
856            _actor_join: actor_join,
857            supervision_join,
858            runtime,
859            cancel_token: CancellationToken::new(),
860            metrics,
861            platform_service,
862            languages,
863            shutdown_timeout: self.shutdown_timeout,
864            services: self.lifecycle_services,
865            component_configs: HashMap::new(),
866            function_invoker: self.function_invoker,
867        })
868    }
869}
870
871impl Default for CamelContextBuilder {
872    fn default() -> Self {
873        Self::new()
874    }
875}
876
877#[cfg(test)]
878#[path = "context_tests.rs"]
879mod context_tests;