Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
12    NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
13    RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34    registry: Option<Arc<std::sync::Mutex<Registry>>>,
35    languages: Option<SharedLanguageRegistry>,
36    metrics: Option<Arc<dyn MetricsCollector>>,
37    // Platform ports
38    platform_service: Option<Arc<dyn PlatformService>>,
39    supervision_config: Option<SupervisionConfig>,
40    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41    shutdown_timeout: std::time::Duration,
42}
43
44/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
45///
46/// # Lifecycle
47///
48/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
49/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
50/// stopped context is not supported — create a new instance instead.
51pub struct CamelContext {
52    registry: Arc<std::sync::Mutex<Registry>>,
53    route_controller: RouteControllerHandle,
54    _actor_join: tokio::task::JoinHandle<()>,
55    supervision_join: Option<tokio::task::JoinHandle<()>>,
56    runtime: Arc<RuntimeBus>,
57    cancel_token: CancellationToken,
58    metrics: Arc<dyn MetricsCollector>,
59    // Platform ports
60    platform_service: Arc<dyn PlatformService>,
61    languages: SharedLanguageRegistry,
62    shutdown_timeout: std::time::Duration,
63    services: Vec<Box<dyn Lifecycle>>,
64    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
65}
66
67/// Opaque handle for runtime side-effect execution operations.
68///
69/// This intentionally does not expose direct lifecycle mutation APIs to callers.
70#[derive(Clone)]
71pub struct RuntimeExecutionHandle {
72    controller: RouteControllerHandle,
73    runtime: Arc<RuntimeBus>,
74}
75
76impl RuntimeExecutionHandle {
77    pub(crate) async fn add_route_definition(
78        &self,
79        definition: RouteDefinition,
80    ) -> Result<(), CamelError> {
81        use crate::lifecycle::ports::RouteRegistrationPort;
82        self.runtime.register_route(definition).await
83    }
84
85    pub(crate) async fn compile_route_definition(
86        &self,
87        definition: RouteDefinition,
88    ) -> Result<camel_api::BoxProcessor, CamelError> {
89        self.controller.compile_route_definition(definition).await
90    }
91
92    pub(crate) async fn swap_route_pipeline(
93        &self,
94        route_id: &str,
95        pipeline: camel_api::BoxProcessor,
96    ) -> Result<(), CamelError> {
97        self.controller.swap_pipeline(route_id, pipeline).await
98    }
99
100    pub(crate) async fn execute_runtime_command(
101        &self,
102        cmd: camel_api::RuntimeCommand,
103    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
104        self.runtime.execute(cmd).await
105    }
106
107    pub(crate) async fn runtime_route_status(
108        &self,
109        route_id: &str,
110    ) -> Result<Option<String>, CamelError> {
111        match self
112            .runtime
113            .ask(camel_api::RuntimeQuery::GetRouteStatus {
114                route_id: route_id.to_string(),
115            })
116            .await
117        {
118            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
119            Ok(_) => Err(CamelError::RouteError(
120                "unexpected runtime query response for route status".to_string(),
121            )),
122            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
123            Err(err) => Err(err),
124        }
125    }
126
127    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
128        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
129            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
130            Ok(_) => Err(CamelError::RouteError(
131                "unexpected runtime query response for route listing".to_string(),
132            )),
133            Err(err) => Err(err),
134        }
135    }
136
137    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
138        self.controller.route_source_hash(route_id).await
139    }
140
141    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
142        if !self.controller.route_exists(route_id).await? {
143            return Err(CamelError::RouteError(format!(
144                "Route '{}' not found",
145                route_id
146            )));
147        }
148        Ok(self
149            .controller
150            .in_flight_count(route_id)
151            .await?
152            .unwrap_or(0))
153    }
154
155    #[cfg(test)]
156    pub(crate) async fn force_start_route_for_test(
157        &self,
158        route_id: &str,
159    ) -> Result<(), CamelError> {
160        self.controller.start_route(route_id).await
161    }
162
163    #[cfg(test)]
164    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
165        self.controller.route_count().await.unwrap_or(0)
166    }
167}
168
169impl CamelContext {
170    fn built_in_languages() -> SharedLanguageRegistry {
171        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
172        languages.insert(
173            "simple".to_string(),
174            Arc::new(camel_language_simple::SimpleLanguage::new()),
175        );
176        #[cfg(feature = "lang-js")]
177        {
178            let js_lang = camel_language_js::JsLanguage::new();
179            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
180            languages.insert("javascript".to_string(), Arc::new(js_lang));
181        }
182        #[cfg(feature = "lang-rhai")]
183        {
184            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
185            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
186        }
187        #[cfg(feature = "lang-jsonpath")]
188        {
189            languages.insert(
190                "jsonpath".to_string(),
191                Arc::new(camel_language_jsonpath::JsonPathLanguage),
192            );
193        }
194        #[cfg(feature = "lang-xpath")]
195        {
196            languages.insert(
197                "xpath".to_string(),
198                Arc::new(camel_language_xpath::XPathLanguage),
199            );
200        }
201        Arc::new(std::sync::Mutex::new(languages))
202    }
203
204    fn build_runtime(
205        controller: RouteControllerHandle,
206        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
207    ) -> Arc<RuntimeBus> {
208        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
209        Arc::new(
210            RuntimeBus::new(
211                Arc::new(store.clone()),
212                Arc::new(store.clone()),
213                Arc::new(store.clone()),
214                Arc::new(store.clone()),
215            )
216            .with_uow(Arc::new(store))
217            .with_execution(execution),
218        )
219    }
220
221    pub fn builder() -> CamelContextBuilder {
222        CamelContextBuilder::new()
223    }
224
225    /// Set a global error handler applied to all routes without a per-route handler.
226    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
227        let _ = self.route_controller.set_error_handler(config).await;
228    }
229
230    /// Enable or disable tracing globally.
231    pub async fn set_tracing(&mut self, enabled: bool) {
232        let _ = self
233            .route_controller
234            .set_tracer_config(TracerConfig {
235                enabled,
236                ..Default::default()
237            })
238            .await;
239    }
240
241    /// Configure tracing with full config.
242    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
243        // Inject metrics collector if not already set
244        let config = if config.metrics_collector.is_none() {
245            TracerConfig {
246                metrics_collector: Some(Arc::clone(&self.metrics)),
247                ..config
248            }
249        } else {
250            config
251        };
252
253        let _ = self.route_controller.set_tracer_config(config).await;
254    }
255
256    /// Builder-style: enable tracing with default config.
257    pub async fn with_tracing(mut self) -> Self {
258        self.set_tracing(true).await;
259        self
260    }
261
262    /// Builder-style: configure tracing with custom config.
263    /// Note: tracing subscriber initialization (stdout/file output) is handled
264    /// separately via init_tracing_subscriber (called in camel-config bridge).
265    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
266        self.set_tracer_config(config).await;
267        self
268    }
269
270    /// Register a lifecycle service (Apache Camel: addService pattern)
271    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
272        // Auto-register MetricsCollector if available
273        if let Some(collector) = service.as_metrics_collector() {
274            self.metrics = collector;
275        }
276
277        self.services.push(Box::new(service));
278        self
279    }
280
281    /// Register a component with this context.
282    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
283        info!(scheme = component.scheme(), "Registering component");
284        self.registry
285            .lock()
286            .expect("mutex poisoned: another thread panicked while holding this lock")
287            .register(Arc::new(component));
288    }
289
290    /// Register a language with this context, keyed by name.
291    ///
292    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
293    /// with the same name is already registered. Use
294    /// [`resolve_language`](Self::resolve_language) to check before
295    /// registering, or choose a distinct name.
296    pub fn register_language(
297        &mut self,
298        name: impl Into<String>,
299        lang: Box<dyn Language>,
300    ) -> Result<(), LanguageRegistryError> {
301        let name = name.into();
302        let mut languages = self
303            .languages
304            .lock()
305            .expect("mutex poisoned: another thread panicked while holding this lock");
306        if languages.contains_key(&name) {
307            return Err(LanguageRegistryError::AlreadyRegistered { name });
308        }
309        languages.insert(name, Arc::from(lang));
310        Ok(())
311    }
312
313    /// Resolve a language by name. Returns `None` if not registered.
314    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
315        let languages = self
316            .languages
317            .lock()
318            .expect("mutex poisoned: another thread panicked while holding this lock");
319        languages.get(name).cloned()
320    }
321
322    /// Add a route definition to this context.
323    ///
324    /// The route must have an ID. Steps are resolved immediately using registered components.
325    pub async fn add_route_definition(
326        &self,
327        definition: RouteDefinition,
328    ) -> Result<(), CamelError> {
329        use crate::lifecycle::ports::RouteRegistrationPort;
330        info!(
331            from = definition.from_uri(),
332            route_id = %definition.route_id(),
333            "Adding route definition"
334        );
335        self.runtime.register_route(definition).await
336    }
337
338    fn next_context_command_id(op: &str, route_id: &str) -> String {
339        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
340        format!("context:{op}:{route_id}:{seq}")
341    }
342
343    /// Access the component registry.
344    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
345        self.registry
346            .lock()
347            .expect("mutex poisoned: another thread panicked while holding this lock")
348    }
349
350    /// Access the shared component registry Arc.
351    pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
352        Arc::clone(&self.registry)
353    }
354
355    /// Get runtime execution handle for file-watcher integrations.
356    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
357        RuntimeExecutionHandle {
358            controller: self.route_controller.clone(),
359            runtime: Arc::clone(&self.runtime),
360        }
361    }
362
363    /// Get the metrics collector.
364    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
365        Arc::clone(&self.metrics)
366    }
367
368    /// Get the platform service.
369    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
370        Arc::clone(&self.platform_service)
371    }
372
373    /// Get the readiness gate port.
374    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
375        self.platform_service.readiness_gate()
376    }
377
378    /// Get the platform identity.
379    pub fn platform_identity(&self) -> PlatformIdentity {
380        self.platform_service.identity()
381    }
382
383    /// Get the leadership service port.
384    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
385        self.platform_service.leadership()
386    }
387
388    /// Get runtime command/query bus handle.
389    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
390        self.runtime.clone()
391    }
392
393    /// Build a producer context wired to this runtime.
394    pub fn producer_context(&self) -> camel_api::ProducerContext {
395        camel_api::ProducerContext::new().with_runtime(self.runtime())
396    }
397
398    /// Query route status via runtime read-model.
399    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
400        match self
401            .runtime()
402            .ask(camel_api::RuntimeQuery::GetRouteStatus {
403                route_id: route_id.to_string(),
404            })
405            .await
406        {
407            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
408            Ok(_) => Err(CamelError::RouteError(
409                "unexpected runtime query response for route status".to_string(),
410            )),
411            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
412            Err(err) => Err(err),
413        }
414    }
415
416    /// Start all routes. Each route's consumer will begin producing exchanges.
417    ///
418    /// Only routes with `auto_startup == true` will be started, in order of their
419    /// `startup_order` (lower values start first).
420    pub async fn start(&mut self) -> Result<(), CamelError> {
421        info!("Starting CamelContext");
422
423        // Start lifecycle services first
424        for (i, service) in self.services.iter_mut().enumerate() {
425            info!("Starting service: {}", service.name());
426            if let Err(e) = service.start().await {
427                // Rollback: stop already started services in reverse order
428                warn!(
429                    "Service {} failed to start, rolling back {} services",
430                    service.name(),
431                    i
432                );
433                for j in (0..i).rev() {
434                    if let Err(rollback_err) = self.services[j].stop().await {
435                        warn!(
436                            "Failed to stop service {} during rollback: {}",
437                            self.services[j].name(),
438                            rollback_err
439                        );
440                    }
441                }
442                return Err(e);
443            }
444        }
445
446        // Then start routes via runtime command bus (aggregate-first),
447        // preserving route controller startup ordering metadata.
448        let route_ids = self.route_controller.auto_startup_route_ids().await?;
449        for route_id in route_ids {
450            self.runtime
451                .execute(camel_api::RuntimeCommand::StartRoute {
452                    route_id: route_id.clone(),
453                    command_id: Self::next_context_command_id("start", &route_id),
454                    causation_id: None,
455                })
456                .await?;
457        }
458
459        info!("CamelContext started");
460        Ok(())
461    }
462
463    /// Graceful shutdown with default 30-second timeout.
464    pub async fn stop(&mut self) -> Result<(), CamelError> {
465        self.stop_timeout(self.shutdown_timeout).await
466    }
467
468    /// Graceful shutdown with custom timeout.
469    ///
470    /// Note: The timeout parameter is currently not used directly; the RouteController
471    /// manages its own shutdown timeout. This may change in a future version.
472    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
473        info!("Stopping CamelContext");
474
475        // Signal cancellation (for any legacy code that might use it)
476        self.cancel_token.cancel();
477        if let Some(join) = self.supervision_join.take() {
478            join.abort();
479        }
480
481        // Stop all routes via runtime command bus (aggregate-first),
482        // preserving route controller shutdown ordering metadata.
483        let route_ids = self.route_controller.shutdown_route_ids().await?;
484        for route_id in route_ids {
485            if let Err(err) = self
486                .runtime
487                .execute(camel_api::RuntimeCommand::StopRoute {
488                    route_id: route_id.clone(),
489                    command_id: Self::next_context_command_id("stop", &route_id),
490                    causation_id: None,
491                })
492                .await
493            {
494                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
495            }
496        }
497
498        // Then stop lifecycle services in reverse insertion order (LIFO)
499        // Continue stopping all services even if some fail
500        let mut first_error = None;
501        for service in self.services.iter_mut().rev() {
502            info!("Stopping service: {}", service.name());
503            if let Err(e) = service.stop().await {
504                warn!("Service {} failed to stop: {}", service.name(), e);
505                if first_error.is_none() {
506                    first_error = Some(e);
507                }
508            }
509        }
510
511        info!("CamelContext stopped");
512
513        if let Some(e) = first_error {
514            Err(e)
515        } else {
516            Ok(())
517        }
518    }
519
520    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
521    pub fn shutdown_timeout(&self) -> std::time::Duration {
522        self.shutdown_timeout
523    }
524
525    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
526    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
527        self.shutdown_timeout = timeout;
528    }
529
530    /// Immediate abort — kills all tasks without draining.
531    pub async fn abort(&mut self) {
532        self.cancel_token.cancel();
533        if let Some(join) = self.supervision_join.take() {
534            join.abort();
535        }
536        let route_ids = self
537            .route_controller
538            .shutdown_route_ids()
539            .await
540            .unwrap_or_default();
541        for route_id in route_ids {
542            let _ = self
543                .runtime
544                .execute(camel_api::RuntimeCommand::StopRoute {
545                    route_id: route_id.clone(),
546                    command_id: Self::next_context_command_id("abort-stop", &route_id),
547                    causation_id: None,
548                })
549                .await;
550        }
551
552        for service in self.services.iter_mut().rev() {
553            let name = service.name().to_string();
554            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
555                Ok(Ok(())) => info!("Aborted service: {}", name),
556                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
557                Err(_) => warn!("Service {} timed out during abort (5s)", name),
558            }
559        }
560    }
561
562    /// Check health status of all registered services.
563    pub fn health_check(&self) -> HealthReport {
564        let services: Vec<ServiceHealth> = self
565            .services
566            .iter()
567            .map(|s| ServiceHealth {
568                name: s.name().to_string(),
569                status: s.status(),
570            })
571            .collect();
572
573        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
574            HealthStatus::Healthy
575        } else {
576            HealthStatus::Unhealthy
577        };
578
579        HealthReport {
580            status,
581            services,
582            ..Default::default()
583        }
584    }
585
586    /// Store a component config. Overwrites any previously stored config of the same type.
587    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
588        self.component_configs
589            .insert(TypeId::of::<T>(), Box::new(config));
590    }
591
592    /// Retrieve a stored component config by type. Returns None if not stored.
593    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
594        self.component_configs
595            .get(&TypeId::of::<T>())
596            .and_then(|b| b.downcast_ref::<T>())
597    }
598}
599
600impl ComponentRegistrar for CamelContext {
601    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
602        self.registry
603            .lock()
604            .expect("mutex poisoned: another thread panicked while holding this lock")
605            .register(component);
606    }
607}
608
609impl ComponentContext for CamelContext {
610    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
611        self.registry.lock().ok()?.get(scheme)
612    }
613
614    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
615        self.languages.lock().ok()?.get(name).cloned()
616    }
617
618    fn metrics(&self) -> Arc<dyn MetricsCollector> {
619        Arc::clone(&self.metrics)
620    }
621
622    fn platform_service(&self) -> Arc<dyn PlatformService> {
623        Arc::clone(&self.platform_service)
624    }
625}
626
627impl CamelContextBuilder {
628    pub fn new() -> Self {
629        Self {
630            registry: None,
631            languages: None,
632            metrics: None,
633            platform_service: None,
634            supervision_config: None,
635            runtime_store: None,
636            shutdown_timeout: std::time::Duration::from_secs(30),
637        }
638    }
639
640    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
641        self.registry = Some(registry);
642        self
643    }
644
645    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
646        self.languages = Some(languages);
647        self
648    }
649
650    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
651        self.metrics = Some(metrics);
652        self
653    }
654
655    /// Set a custom platform service.
656    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
657        self.platform_service = Some(platform_service);
658        self
659    }
660
661    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
662        self.supervision_config = Some(config);
663        self
664    }
665
666    pub fn runtime_store(
667        mut self,
668        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
669    ) -> Self {
670        self.runtime_store = Some(store);
671        self
672    }
673
674    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
675        self.shutdown_timeout = timeout;
676        self
677    }
678
679    pub async fn build(self) -> Result<CamelContext, CamelError> {
680        let registry = self
681            .registry
682            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
683        let languages = self
684            .languages
685            .unwrap_or_else(CamelContext::built_in_languages);
686        let simple_with_resolver: Arc<dyn Language> = Arc::new(
687            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
688                let languages = Arc::clone(&languages);
689                move |name| {
690                    languages
691                        .lock()
692                        .ok()
693                        .and_then(|registry| registry.get(name).cloned())
694                }
695            })),
696        );
697        languages
698            .lock()
699            .expect("mutex poisoned: another thread panicked while holding this lock")
700            .insert("simple".to_string(), simple_with_resolver);
701        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
702        let platform_service = self
703            .platform_service
704            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
705
706        let (controller, actor_join, supervision_join) =
707            if let Some(config) = self.supervision_config {
708                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
709                let mut controller_impl = DefaultRouteController::with_languages(
710                    Arc::clone(&registry),
711                    Arc::clone(&languages),
712                    Arc::clone(&platform_service),
713                );
714                controller_impl.set_crash_notifier(crash_tx);
715                let (controller, actor_join) = spawn_controller_actor(controller_impl);
716                let supervision_join = spawn_supervision_task(
717                    controller.clone(),
718                    config,
719                    Some(Arc::clone(&metrics)),
720                    crash_rx,
721                );
722                (controller, actor_join, Some(supervision_join))
723            } else {
724                let controller_impl = DefaultRouteController::with_languages(
725                    Arc::clone(&registry),
726                    Arc::clone(&languages),
727                    Arc::clone(&platform_service),
728                );
729                let (controller, actor_join) = spawn_controller_actor(controller_impl);
730                (controller, actor_join, None)
731            };
732
733        let store = self.runtime_store.unwrap_or_default();
734        let runtime = CamelContext::build_runtime(controller.clone(), store);
735        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
736        controller
737            .try_set_runtime_handle(runtime_handle)
738            .expect("controller actor mailbox should accept initial runtime handle");
739
740        Ok(CamelContext {
741            registry,
742            route_controller: controller,
743            _actor_join: actor_join,
744            supervision_join,
745            runtime,
746            cancel_token: CancellationToken::new(),
747            metrics,
748            platform_service,
749            languages,
750            shutdown_timeout: self.shutdown_timeout,
751            services: Vec::new(),
752            component_configs: HashMap::new(),
753        })
754    }
755}
756
757impl Default for CamelContextBuilder {
758    fn default() -> Self {
759        Self::new()
760    }
761}
762
763#[cfg(test)]
764#[path = "context_tests.rs"]
765mod context_tests;