Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
12    NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
13    RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34    registry: Option<Arc<std::sync::Mutex<Registry>>>,
35    languages: Option<SharedLanguageRegistry>,
36    metrics: Option<Arc<dyn MetricsCollector>>,
37    // Platform ports
38    platform_service: Option<Arc<dyn PlatformService>>,
39    supervision_config: Option<SupervisionConfig>,
40    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41    shutdown_timeout: std::time::Duration,
42    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
43}
44
45/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
46///
47/// # Lifecycle
48///
49/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
50/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
51/// stopped context is not supported — create a new instance instead.
52pub struct CamelContext {
53    registry: Arc<std::sync::Mutex<Registry>>,
54    route_controller: RouteControllerHandle,
55    _actor_join: tokio::task::JoinHandle<()>,
56    supervision_join: Option<tokio::task::JoinHandle<()>>,
57    runtime: Arc<RuntimeBus>,
58    cancel_token: CancellationToken,
59    metrics: Arc<dyn MetricsCollector>,
60    // Platform ports
61    platform_service: Arc<dyn PlatformService>,
62    languages: SharedLanguageRegistry,
63    shutdown_timeout: std::time::Duration,
64    services: Vec<Box<dyn Lifecycle>>,
65    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
66}
67
68/// Opaque handle for runtime side-effect execution operations.
69///
70/// This intentionally does not expose direct lifecycle mutation APIs to callers.
71#[derive(Clone)]
72pub struct RuntimeExecutionHandle {
73    controller: RouteControllerHandle,
74    runtime: Arc<RuntimeBus>,
75}
76
77impl RuntimeExecutionHandle {
78    pub(crate) async fn add_route_definition(
79        &self,
80        definition: RouteDefinition,
81    ) -> Result<(), CamelError> {
82        use crate::lifecycle::ports::RouteRegistrationPort;
83        self.runtime.register_route(definition).await
84    }
85
86    pub(crate) async fn compile_route_definition(
87        &self,
88        definition: RouteDefinition,
89    ) -> Result<camel_api::BoxProcessor, CamelError> {
90        self.controller.compile_route_definition(definition).await
91    }
92
93    pub(crate) async fn swap_route_pipeline(
94        &self,
95        route_id: &str,
96        pipeline: camel_api::BoxProcessor,
97    ) -> Result<(), CamelError> {
98        self.controller.swap_pipeline(route_id, pipeline).await
99    }
100
101    pub(crate) async fn execute_runtime_command(
102        &self,
103        cmd: camel_api::RuntimeCommand,
104    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
105        self.runtime.execute(cmd).await
106    }
107
108    pub(crate) async fn runtime_route_status(
109        &self,
110        route_id: &str,
111    ) -> Result<Option<String>, CamelError> {
112        match self
113            .runtime
114            .ask(camel_api::RuntimeQuery::GetRouteStatus {
115                route_id: route_id.to_string(),
116            })
117            .await
118        {
119            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
120            Ok(_) => Err(CamelError::RouteError(
121                "unexpected runtime query response for route status".to_string(),
122            )),
123            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
124            Err(err) => Err(err),
125        }
126    }
127
128    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
129        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
130            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
131            Ok(_) => Err(CamelError::RouteError(
132                "unexpected runtime query response for route listing".to_string(),
133            )),
134            Err(err) => Err(err),
135        }
136    }
137
138    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
139        self.controller.route_source_hash(route_id).await
140    }
141
142    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
143        if !self.controller.route_exists(route_id).await? {
144            return Err(CamelError::RouteError(format!(
145                "Route '{}' not found",
146                route_id
147            )));
148        }
149        Ok(self
150            .controller
151            .in_flight_count(route_id)
152            .await?
153            .unwrap_or(0))
154    }
155
156    #[cfg(test)]
157    pub(crate) async fn force_start_route_for_test(
158        &self,
159        route_id: &str,
160    ) -> Result<(), CamelError> {
161        self.controller.start_route(route_id).await
162    }
163
164    #[cfg(test)]
165    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
166        self.controller.route_count().await.unwrap_or(0)
167    }
168}
169
170impl CamelContext {
171    fn built_in_languages() -> SharedLanguageRegistry {
172        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
173        languages.insert(
174            "simple".to_string(),
175            Arc::new(camel_language_simple::SimpleLanguage::new()),
176        );
177        #[cfg(feature = "lang-js")]
178        {
179            let js_lang = camel_language_js::JsLanguage::new();
180            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
181            languages.insert("javascript".to_string(), Arc::new(js_lang));
182        }
183        #[cfg(feature = "lang-rhai")]
184        {
185            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
186            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
187        }
188        #[cfg(feature = "lang-jsonpath")]
189        {
190            languages.insert(
191                "jsonpath".to_string(),
192                Arc::new(camel_language_jsonpath::JsonPathLanguage),
193            );
194        }
195        #[cfg(feature = "lang-xpath")]
196        {
197            languages.insert(
198                "xpath".to_string(),
199                Arc::new(camel_language_xpath::XPathLanguage),
200            );
201        }
202        Arc::new(std::sync::Mutex::new(languages))
203    }
204
205    fn build_runtime(
206        controller: RouteControllerHandle,
207        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
208    ) -> Arc<RuntimeBus> {
209        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
210        Arc::new(
211            RuntimeBus::new(
212                Arc::new(store.clone()),
213                Arc::new(store.clone()),
214                Arc::new(store.clone()),
215                Arc::new(store.clone()),
216            )
217            .with_uow(Arc::new(store))
218            .with_execution(execution),
219        )
220    }
221
222    pub fn builder() -> CamelContextBuilder {
223        CamelContextBuilder::new()
224    }
225
226    /// Set a global error handler applied to all routes without a per-route handler.
227    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
228        let _ = self.route_controller.set_error_handler(config).await;
229    }
230
231    /// Enable or disable tracing globally.
232    pub async fn set_tracing(&mut self, enabled: bool) {
233        let _ = self
234            .route_controller
235            .set_tracer_config(TracerConfig {
236                enabled,
237                ..Default::default()
238            })
239            .await;
240    }
241
242    /// Configure tracing with full config.
243    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
244        // Inject metrics collector if not already set
245        let config = if config.metrics_collector.is_none() {
246            TracerConfig {
247                metrics_collector: Some(Arc::clone(&self.metrics)),
248                ..config
249            }
250        } else {
251            config
252        };
253
254        let _ = self.route_controller.set_tracer_config(config).await;
255    }
256
257    /// Builder-style: enable tracing with default config.
258    pub async fn with_tracing(mut self) -> Self {
259        self.set_tracing(true).await;
260        self
261    }
262
263    /// Builder-style: configure tracing with custom config.
264    /// Note: tracing subscriber initialization (stdout/file output) is handled
265    /// separately via init_tracing_subscriber (called in camel-config bridge).
266    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
267        self.set_tracer_config(config).await;
268        self
269    }
270
271    /// Register a lifecycle service (Apache Camel: addService pattern)
272    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
273        // Auto-register MetricsCollector if available
274        if let Some(collector) = service.as_metrics_collector() {
275            self.metrics = collector;
276        }
277
278        self.services.push(Box::new(service));
279        self
280    }
281
282    /// Register a component with this context.
283    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
284        info!(scheme = component.scheme(), "Registering component");
285        self.registry
286            .lock()
287            .expect("mutex poisoned: another thread panicked while holding this lock")
288            .register(Arc::new(component));
289    }
290
291    /// Register a language with this context, keyed by name.
292    ///
293    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
294    /// with the same name is already registered. Use
295    /// [`resolve_language`](Self::resolve_language) to check before
296    /// registering, or choose a distinct name.
297    pub fn register_language(
298        &mut self,
299        name: impl Into<String>,
300        lang: Box<dyn Language>,
301    ) -> Result<(), LanguageRegistryError> {
302        let name = name.into();
303        let mut languages = self
304            .languages
305            .lock()
306            .expect("mutex poisoned: another thread panicked while holding this lock");
307        if languages.contains_key(&name) {
308            return Err(LanguageRegistryError::AlreadyRegistered { name });
309        }
310        languages.insert(name, Arc::from(lang));
311        Ok(())
312    }
313
314    /// Resolve a language by name. Returns `None` if not registered.
315    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
316        let languages = self
317            .languages
318            .lock()
319            .expect("mutex poisoned: another thread panicked while holding this lock");
320        languages.get(name).cloned()
321    }
322
323    /// Add a route definition to this context.
324    ///
325    /// The route must have an ID. Steps are resolved immediately using registered components.
326    pub async fn add_route_definition(
327        &self,
328        definition: RouteDefinition,
329    ) -> Result<(), CamelError> {
330        use crate::lifecycle::ports::RouteRegistrationPort;
331        info!(
332            from = definition.from_uri(),
333            route_id = %definition.route_id(),
334            "Adding route definition"
335        );
336        self.runtime.register_route(definition).await
337    }
338
339    fn next_context_command_id(op: &str, route_id: &str) -> String {
340        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
341        format!("context:{op}:{route_id}:{seq}")
342    }
343
344    /// Access the component registry.
345    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
346        self.registry
347            .lock()
348            .expect("mutex poisoned: another thread panicked while holding this lock")
349    }
350
351    /// Access the shared component registry Arc.
352    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
353        Arc::clone(&self.registry)
354    }
355
356    /// Get runtime execution handle for file-watcher integrations.
357    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
358        RuntimeExecutionHandle {
359            controller: self.route_controller.clone(),
360            runtime: Arc::clone(&self.runtime),
361        }
362    }
363
364    /// Get the metrics collector.
365    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
366        Arc::clone(&self.metrics)
367    }
368
369    /// Get the platform service.
370    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
371        Arc::clone(&self.platform_service)
372    }
373
374    /// Get the readiness gate port.
375    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
376        self.platform_service.readiness_gate()
377    }
378
379    /// Get the platform identity.
380    pub fn platform_identity(&self) -> PlatformIdentity {
381        self.platform_service.identity()
382    }
383
384    /// Get the leadership service port.
385    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
386        self.platform_service.leadership()
387    }
388
389    /// Get runtime command/query bus handle.
390    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
391        self.runtime.clone()
392    }
393
394    /// Build a producer context wired to this runtime.
395    pub fn producer_context(&self) -> camel_api::ProducerContext {
396        camel_api::ProducerContext::new().with_runtime(self.runtime())
397    }
398
399    /// Query route status via runtime read-model.
400    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
401        match self
402            .runtime()
403            .ask(camel_api::RuntimeQuery::GetRouteStatus {
404                route_id: route_id.to_string(),
405            })
406            .await
407        {
408            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
409            Ok(_) => Err(CamelError::RouteError(
410                "unexpected runtime query response for route status".to_string(),
411            )),
412            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
413            Err(err) => Err(err),
414        }
415    }
416
417    /// Start all routes. Each route's consumer will begin producing exchanges.
418    ///
419    /// Only routes with `auto_startup == true` will be started, in order of their
420    /// `startup_order` (lower values start first).
421    pub async fn start(&mut self) -> Result<(), CamelError> {
422        info!("Starting CamelContext");
423
424        // Start lifecycle services first
425        for (i, service) in self.services.iter_mut().enumerate() {
426            info!("Starting service: {}", service.name());
427            if let Err(e) = service.start().await {
428                // Rollback: stop already started services in reverse order
429                warn!(
430                    "Service {} failed to start, rolling back {} services",
431                    service.name(),
432                    i
433                );
434                for j in (0..i).rev() {
435                    if let Err(rollback_err) = self.services[j].stop().await {
436                        warn!(
437                            "Failed to stop service {} during rollback: {}",
438                            self.services[j].name(),
439                            rollback_err
440                        );
441                    }
442                }
443                return Err(e);
444            }
445        }
446
447        // Then start routes via runtime command bus (aggregate-first),
448        // preserving route controller startup ordering metadata.
449        let route_ids = self.route_controller.auto_startup_route_ids().await?;
450        for route_id in route_ids {
451            self.runtime
452                .execute(camel_api::RuntimeCommand::StartRoute {
453                    route_id: route_id.clone(),
454                    command_id: Self::next_context_command_id("start", &route_id),
455                    causation_id: None,
456                })
457                .await?;
458        }
459
460        info!("CamelContext started");
461        Ok(())
462    }
463
464    /// Graceful shutdown with default 30-second timeout.
465    pub async fn stop(&mut self) -> Result<(), CamelError> {
466        self.stop_timeout(self.shutdown_timeout).await
467    }
468
469    /// Graceful shutdown with custom timeout.
470    ///
471    /// Note: The timeout parameter is currently not used directly; the RouteController
472    /// manages its own shutdown timeout. This may change in a future version.
473    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
474        info!("Stopping CamelContext");
475
476        // Signal cancellation (for any legacy code that might use it)
477        self.cancel_token.cancel();
478        if let Some(join) = self.supervision_join.take() {
479            join.abort();
480        }
481
482        // Stop all routes via runtime command bus (aggregate-first),
483        // preserving route controller shutdown ordering metadata.
484        let route_ids = self.route_controller.shutdown_route_ids().await?;
485        for route_id in route_ids {
486            if let Err(err) = self
487                .runtime
488                .execute(camel_api::RuntimeCommand::StopRoute {
489                    route_id: route_id.clone(),
490                    command_id: Self::next_context_command_id("stop", &route_id),
491                    causation_id: None,
492                })
493                .await
494            {
495                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
496            }
497        }
498
499        // Then stop lifecycle services in reverse insertion order (LIFO)
500        // Continue stopping all services even if some fail
501        let mut first_error = None;
502        for service in self.services.iter_mut().rev() {
503            info!("Stopping service: {}", service.name());
504            if let Err(e) = service.stop().await {
505                warn!("Service {} failed to stop: {}", service.name(), e);
506                if first_error.is_none() {
507                    first_error = Some(e);
508                }
509            }
510        }
511
512        info!("CamelContext stopped");
513
514        if let Some(e) = first_error {
515            Err(e)
516        } else {
517            Ok(())
518        }
519    }
520
521    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
522    pub fn shutdown_timeout(&self) -> std::time::Duration {
523        self.shutdown_timeout
524    }
525
526    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
527    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
528        self.shutdown_timeout = timeout;
529    }
530
531    /// Immediate abort — kills all tasks without draining.
532    pub async fn abort(&mut self) {
533        self.cancel_token.cancel();
534        if let Some(join) = self.supervision_join.take() {
535            join.abort();
536        }
537        let route_ids = self
538            .route_controller
539            .shutdown_route_ids()
540            .await
541            .unwrap_or_default();
542        for route_id in route_ids {
543            let _ = self
544                .runtime
545                .execute(camel_api::RuntimeCommand::StopRoute {
546                    route_id: route_id.clone(),
547                    command_id: Self::next_context_command_id("abort-stop", &route_id),
548                    causation_id: None,
549                })
550                .await;
551        }
552
553        for service in self.services.iter_mut().rev() {
554            let name = service.name().to_string();
555            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
556                Ok(Ok(())) => info!("Aborted service: {}", name),
557                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
558                Err(_) => warn!("Service {} timed out during abort (5s)", name),
559            }
560        }
561    }
562
563    /// Check health status of all registered services.
564    pub fn health_check(&self) -> HealthReport {
565        let services: Vec<ServiceHealth> = self
566            .services
567            .iter()
568            .map(|s| ServiceHealth {
569                name: s.name().to_string(),
570                status: s.status(),
571            })
572            .collect();
573
574        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
575            HealthStatus::Healthy
576        } else {
577            HealthStatus::Unhealthy
578        };
579
580        HealthReport {
581            status,
582            services,
583            ..Default::default()
584        }
585    }
586
587    /// Store a component config. Overwrites any previously stored config of the same type.
588    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
589        self.component_configs
590            .insert(TypeId::of::<T>(), Box::new(config));
591    }
592
593    /// Retrieve a stored component config by type. Returns None if not stored.
594    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
595        self.component_configs
596            .get(&TypeId::of::<T>())
597            .and_then(|b| b.downcast_ref::<T>())
598    }
599}
600
601impl ComponentRegistrar for CamelContext {
602    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
603        self.registry
604            .lock()
605            .expect("mutex poisoned: another thread panicked while holding this lock")
606            .register(component);
607    }
608}
609
610impl ComponentContext for CamelContext {
611    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
612        self.registry.lock().ok()?.get(scheme)
613    }
614
615    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
616        self.languages.lock().ok()?.get(name).cloned()
617    }
618
619    fn metrics(&self) -> Arc<dyn MetricsCollector> {
620        Arc::clone(&self.metrics)
621    }
622
623    fn platform_service(&self) -> Arc<dyn PlatformService> {
624        Arc::clone(&self.platform_service)
625    }
626}
627
628impl CamelContextBuilder {
629    pub fn new() -> Self {
630        Self {
631            registry: None,
632            languages: None,
633            metrics: None,
634            platform_service: None,
635            supervision_config: None,
636            runtime_store: None,
637            shutdown_timeout: std::time::Duration::from_secs(30),
638            beans: None,
639        }
640    }
641
642    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
643        self.registry = Some(registry);
644        self
645    }
646
647    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
648        self.languages = Some(languages);
649        self
650    }
651
652    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
653        self.metrics = Some(metrics);
654        self
655    }
656
657    /// Set a custom platform service.
658    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
659        self.platform_service = Some(platform_service);
660        self
661    }
662
663    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
664        self.supervision_config = Some(config);
665        self
666    }
667
668    pub fn runtime_store(
669        mut self,
670        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
671    ) -> Self {
672        self.runtime_store = Some(store);
673        self
674    }
675
676    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
677        self.shutdown_timeout = timeout;
678        self
679    }
680
681    /// Inject a shared `BeanRegistry` for bean resolution across routes.
682    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
683        self.beans = Some(beans);
684        self
685    }
686
687    pub async fn build(self) -> Result<CamelContext, CamelError> {
688        let registry = self
689            .registry
690            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
691        let languages = self
692            .languages
693            .unwrap_or_else(CamelContext::built_in_languages);
694        let simple_with_resolver: Arc<dyn Language> = Arc::new(
695            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
696                let languages = Arc::clone(&languages);
697                move |name| {
698                    languages
699                        .lock()
700                        .ok()
701                        .and_then(|registry| registry.get(name).cloned())
702                }
703            })),
704        );
705        languages
706            .lock()
707            .expect("mutex poisoned: another thread panicked while holding this lock")
708            .insert("simple".to_string(), simple_with_resolver);
709        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
710        let platform_service = self
711            .platform_service
712            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
713
714        let (controller, actor_join, supervision_join) =
715            if let Some(config) = self.supervision_config {
716                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
717                let mut controller_impl = if let Some(ref beans) = self.beans {
718                    DefaultRouteController::with_languages_and_beans(
719                        Arc::clone(&registry),
720                        Arc::clone(&languages),
721                        Arc::clone(&platform_service),
722                        Arc::clone(beans),
723                    )
724                } else {
725                    DefaultRouteController::with_languages(
726                        Arc::clone(&registry),
727                        Arc::clone(&languages),
728                        Arc::clone(&platform_service),
729                    )
730                };
731                controller_impl.set_crash_notifier(crash_tx);
732                let (controller, actor_join) = spawn_controller_actor(controller_impl);
733                let supervision_join = spawn_supervision_task(
734                    controller.clone(),
735                    config,
736                    Some(Arc::clone(&metrics)),
737                    crash_rx,
738                );
739                (controller, actor_join, Some(supervision_join))
740            } else {
741                let controller_impl = if let Some(ref beans) = self.beans {
742                    DefaultRouteController::with_languages_and_beans(
743                        Arc::clone(&registry),
744                        Arc::clone(&languages),
745                        Arc::clone(&platform_service),
746                        Arc::clone(beans),
747                    )
748                } else {
749                    DefaultRouteController::with_languages(
750                        Arc::clone(&registry),
751                        Arc::clone(&languages),
752                        Arc::clone(&platform_service),
753                    )
754                };
755                let (controller, actor_join) = spawn_controller_actor(controller_impl);
756                (controller, actor_join, None)
757            };
758
759        let store = self.runtime_store.unwrap_or_default();
760        let runtime = CamelContext::build_runtime(controller.clone(), store);
761        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
762        controller
763            .try_set_runtime_handle(runtime_handle)
764            .expect("controller actor mailbox should accept initial runtime handle");
765
766        Ok(CamelContext {
767            registry,
768            route_controller: controller,
769            _actor_join: actor_join,
770            supervision_join,
771            runtime,
772            cancel_token: CancellationToken::new(),
773            metrics,
774            platform_service,
775            languages,
776            shutdown_timeout: self.shutdown_timeout,
777            services: Vec::new(),
778            component_configs: HashMap::new(),
779        })
780    }
781}
782
783impl Default for CamelContextBuilder {
784    fn default() -> Self {
785        Self::new()
786    }
787}
788
789#[cfg(test)]
790#[path = "context_tests.rs"]
791mod context_tests;