Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
12    NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
13    RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23    DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34    registry: Option<Arc<std::sync::Mutex<Registry>>>,
35    languages: Option<SharedLanguageRegistry>,
36    metrics: Option<Arc<dyn MetricsCollector>>,
37    // Platform ports
38    platform_service: Option<Arc<dyn PlatformService>>,
39    supervision_config: Option<SupervisionConfig>,
40    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41    shutdown_timeout: std::time::Duration,
42}
43
44/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
45///
46/// # Lifecycle
47///
48/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
49/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
50/// stopped context is not supported — create a new instance instead.
51pub struct CamelContext {
52    registry: Arc<std::sync::Mutex<Registry>>,
53    route_controller: RouteControllerHandle,
54    _actor_join: tokio::task::JoinHandle<()>,
55    supervision_join: Option<tokio::task::JoinHandle<()>>,
56    runtime: Arc<RuntimeBus>,
57    cancel_token: CancellationToken,
58    metrics: Arc<dyn MetricsCollector>,
59    // Platform ports
60    platform_service: Arc<dyn PlatformService>,
61    languages: SharedLanguageRegistry,
62    shutdown_timeout: std::time::Duration,
63    services: Vec<Box<dyn Lifecycle>>,
64    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
65}
66
67/// Opaque handle for runtime side-effect execution operations.
68///
69/// This intentionally does not expose direct lifecycle mutation APIs to callers.
70#[derive(Clone)]
71pub struct RuntimeExecutionHandle {
72    controller: RouteControllerHandle,
73    runtime: Arc<RuntimeBus>,
74}
75
76impl RuntimeExecutionHandle {
77    pub(crate) async fn add_route_definition(
78        &self,
79        definition: RouteDefinition,
80    ) -> Result<(), CamelError> {
81        use crate::lifecycle::ports::RouteRegistrationPort;
82        self.runtime.register_route(definition).await
83    }
84
85    pub(crate) async fn compile_route_definition(
86        &self,
87        definition: RouteDefinition,
88    ) -> Result<camel_api::BoxProcessor, CamelError> {
89        self.controller.compile_route_definition(definition).await
90    }
91
92    pub(crate) async fn swap_route_pipeline(
93        &self,
94        route_id: &str,
95        pipeline: camel_api::BoxProcessor,
96    ) -> Result<(), CamelError> {
97        self.controller.swap_pipeline(route_id, pipeline).await
98    }
99
100    pub(crate) async fn execute_runtime_command(
101        &self,
102        cmd: camel_api::RuntimeCommand,
103    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
104        self.runtime.execute(cmd).await
105    }
106
107    pub(crate) async fn runtime_route_status(
108        &self,
109        route_id: &str,
110    ) -> Result<Option<String>, CamelError> {
111        match self
112            .runtime
113            .ask(camel_api::RuntimeQuery::GetRouteStatus {
114                route_id: route_id.to_string(),
115            })
116            .await
117        {
118            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
119            Ok(_) => Err(CamelError::RouteError(
120                "unexpected runtime query response for route status".to_string(),
121            )),
122            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
123            Err(err) => Err(err),
124        }
125    }
126
127    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
128        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
129            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
130            Ok(_) => Err(CamelError::RouteError(
131                "unexpected runtime query response for route listing".to_string(),
132            )),
133            Err(err) => Err(err),
134        }
135    }
136
137    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
138        self.controller.route_source_hash(route_id).await
139    }
140
141    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
142        if !self.controller.route_exists(route_id).await? {
143            return Err(CamelError::RouteError(format!(
144                "Route '{}' not found",
145                route_id
146            )));
147        }
148        Ok(self
149            .controller
150            .in_flight_count(route_id)
151            .await?
152            .unwrap_or(0))
153    }
154
155    #[cfg(test)]
156    pub(crate) async fn force_start_route_for_test(
157        &self,
158        route_id: &str,
159    ) -> Result<(), CamelError> {
160        self.controller.start_route(route_id).await
161    }
162
163    #[cfg(test)]
164    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
165        self.controller.route_count().await.unwrap_or(0)
166    }
167}
168
169impl CamelContext {
170    fn built_in_languages() -> SharedLanguageRegistry {
171        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
172        languages.insert(
173            "simple".to_string(),
174            Arc::new(camel_language_simple::SimpleLanguage::new()),
175        );
176        #[cfg(feature = "lang-js")]
177        {
178            let js_lang = camel_language_js::JsLanguage::new();
179            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
180            languages.insert("javascript".to_string(), Arc::new(js_lang));
181        }
182        #[cfg(feature = "lang-rhai")]
183        {
184            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
185            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
186        }
187        #[cfg(feature = "lang-jsonpath")]
188        {
189            languages.insert(
190                "jsonpath".to_string(),
191                Arc::new(camel_language_jsonpath::JsonPathLanguage),
192            );
193        }
194        #[cfg(feature = "lang-xpath")]
195        {
196            languages.insert(
197                "xpath".to_string(),
198                Arc::new(camel_language_xpath::XPathLanguage),
199            );
200        }
201        Arc::new(std::sync::Mutex::new(languages))
202    }
203
204    fn build_runtime(
205        controller: RouteControllerHandle,
206        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
207    ) -> Arc<RuntimeBus> {
208        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
209        Arc::new(
210            RuntimeBus::new(
211                Arc::new(store.clone()),
212                Arc::new(store.clone()),
213                Arc::new(store.clone()),
214                Arc::new(store.clone()),
215            )
216            .with_uow(Arc::new(store))
217            .with_execution(execution),
218        )
219    }
220
221    pub fn builder() -> CamelContextBuilder {
222        CamelContextBuilder::new()
223    }
224
225    /// Set a global error handler applied to all routes without a per-route handler.
226    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
227        let _ = self.route_controller.set_error_handler(config).await;
228    }
229
230    /// Enable or disable tracing globally.
231    pub async fn set_tracing(&mut self, enabled: bool) {
232        let _ = self
233            .route_controller
234            .set_tracer_config(TracerConfig {
235                enabled,
236                ..Default::default()
237            })
238            .await;
239    }
240
241    /// Configure tracing with full config.
242    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
243        // Inject metrics collector if not already set
244        let config = if config.metrics_collector.is_none() {
245            TracerConfig {
246                metrics_collector: Some(Arc::clone(&self.metrics)),
247                ..config
248            }
249        } else {
250            config
251        };
252
253        let _ = self.route_controller.set_tracer_config(config).await;
254    }
255
256    /// Builder-style: enable tracing with default config.
257    pub async fn with_tracing(mut self) -> Self {
258        self.set_tracing(true).await;
259        self
260    }
261
262    /// Builder-style: configure tracing with custom config.
263    /// Note: tracing subscriber initialization (stdout/file output) is handled
264    /// separately via init_tracing_subscriber (called in camel-config bridge).
265    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
266        self.set_tracer_config(config).await;
267        self
268    }
269
270    /// Register a lifecycle service (Apache Camel: addService pattern)
271    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
272        // Auto-register MetricsCollector if available
273        if let Some(collector) = service.as_metrics_collector() {
274            self.metrics = collector;
275        }
276
277        self.services.push(Box::new(service));
278        self
279    }
280
281    /// Register a component with this context.
282    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
283        info!(scheme = component.scheme(), "Registering component");
284        self.registry
285            .lock()
286            .expect("mutex poisoned: another thread panicked while holding this lock")
287            .register(Arc::new(component));
288    }
289
290    /// Register a language with this context, keyed by name.
291    ///
292    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
293    /// with the same name is already registered. Use
294    /// [`resolve_language`](Self::resolve_language) to check before
295    /// registering, or choose a distinct name.
296    pub fn register_language(
297        &mut self,
298        name: impl Into<String>,
299        lang: Box<dyn Language>,
300    ) -> Result<(), LanguageRegistryError> {
301        let name = name.into();
302        let mut languages = self
303            .languages
304            .lock()
305            .expect("mutex poisoned: another thread panicked while holding this lock");
306        if languages.contains_key(&name) {
307            return Err(LanguageRegistryError::AlreadyRegistered { name });
308        }
309        languages.insert(name, Arc::from(lang));
310        Ok(())
311    }
312
313    /// Resolve a language by name. Returns `None` if not registered.
314    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
315        let languages = self
316            .languages
317            .lock()
318            .expect("mutex poisoned: another thread panicked while holding this lock");
319        languages.get(name).cloned()
320    }
321
322    /// Add a route definition to this context.
323    ///
324    /// The route must have an ID. Steps are resolved immediately using registered components.
325    pub async fn add_route_definition(
326        &self,
327        definition: RouteDefinition,
328    ) -> Result<(), CamelError> {
329        use crate::lifecycle::ports::RouteRegistrationPort;
330        info!(
331            from = definition.from_uri(),
332            route_id = %definition.route_id(),
333            "Adding route definition"
334        );
335        self.runtime.register_route(definition).await
336    }
337
338    fn next_context_command_id(op: &str, route_id: &str) -> String {
339        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
340        format!("context:{op}:{route_id}:{seq}")
341    }
342
343    /// Access the component registry.
344    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
345        self.registry
346            .lock()
347            .expect("mutex poisoned: another thread panicked while holding this lock")
348    }
349
350    /// Get runtime execution handle for file-watcher integrations.
351    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
352        RuntimeExecutionHandle {
353            controller: self.route_controller.clone(),
354            runtime: Arc::clone(&self.runtime),
355        }
356    }
357
358    /// Get the metrics collector.
359    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
360        Arc::clone(&self.metrics)
361    }
362
363    /// Get the platform service.
364    pub fn platform_service(&self) -> Arc<dyn PlatformService> {
365        Arc::clone(&self.platform_service)
366    }
367
368    /// Get the readiness gate port.
369    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
370        self.platform_service.readiness_gate()
371    }
372
373    /// Get the platform identity.
374    pub fn platform_identity(&self) -> PlatformIdentity {
375        self.platform_service.identity()
376    }
377
378    /// Get the leadership service port.
379    pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
380        self.platform_service.leadership()
381    }
382
383    /// Get runtime command/query bus handle.
384    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
385        self.runtime.clone()
386    }
387
388    /// Build a producer context wired to this runtime.
389    pub fn producer_context(&self) -> camel_api::ProducerContext {
390        camel_api::ProducerContext::new().with_runtime(self.runtime())
391    }
392
393    /// Query route status via runtime read-model.
394    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
395        match self
396            .runtime()
397            .ask(camel_api::RuntimeQuery::GetRouteStatus {
398                route_id: route_id.to_string(),
399            })
400            .await
401        {
402            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
403            Ok(_) => Err(CamelError::RouteError(
404                "unexpected runtime query response for route status".to_string(),
405            )),
406            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
407            Err(err) => Err(err),
408        }
409    }
410
411    /// Start all routes. Each route's consumer will begin producing exchanges.
412    ///
413    /// Only routes with `auto_startup == true` will be started, in order of their
414    /// `startup_order` (lower values start first).
415    pub async fn start(&mut self) -> Result<(), CamelError> {
416        info!("Starting CamelContext");
417
418        // Start lifecycle services first
419        for (i, service) in self.services.iter_mut().enumerate() {
420            info!("Starting service: {}", service.name());
421            if let Err(e) = service.start().await {
422                // Rollback: stop already started services in reverse order
423                warn!(
424                    "Service {} failed to start, rolling back {} services",
425                    service.name(),
426                    i
427                );
428                for j in (0..i).rev() {
429                    if let Err(rollback_err) = self.services[j].stop().await {
430                        warn!(
431                            "Failed to stop service {} during rollback: {}",
432                            self.services[j].name(),
433                            rollback_err
434                        );
435                    }
436                }
437                return Err(e);
438            }
439        }
440
441        // Then start routes via runtime command bus (aggregate-first),
442        // preserving route controller startup ordering metadata.
443        let route_ids = self.route_controller.auto_startup_route_ids().await?;
444        for route_id in route_ids {
445            self.runtime
446                .execute(camel_api::RuntimeCommand::StartRoute {
447                    route_id: route_id.clone(),
448                    command_id: Self::next_context_command_id("start", &route_id),
449                    causation_id: None,
450                })
451                .await?;
452        }
453
454        info!("CamelContext started");
455        Ok(())
456    }
457
458    /// Graceful shutdown with default 30-second timeout.
459    pub async fn stop(&mut self) -> Result<(), CamelError> {
460        self.stop_timeout(self.shutdown_timeout).await
461    }
462
463    /// Graceful shutdown with custom timeout.
464    ///
465    /// Note: The timeout parameter is currently not used directly; the RouteController
466    /// manages its own shutdown timeout. This may change in a future version.
467    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
468        info!("Stopping CamelContext");
469
470        // Signal cancellation (for any legacy code that might use it)
471        self.cancel_token.cancel();
472        if let Some(join) = self.supervision_join.take() {
473            join.abort();
474        }
475
476        // Stop all routes via runtime command bus (aggregate-first),
477        // preserving route controller shutdown ordering metadata.
478        let route_ids = self.route_controller.shutdown_route_ids().await?;
479        for route_id in route_ids {
480            if let Err(err) = self
481                .runtime
482                .execute(camel_api::RuntimeCommand::StopRoute {
483                    route_id: route_id.clone(),
484                    command_id: Self::next_context_command_id("stop", &route_id),
485                    causation_id: None,
486                })
487                .await
488            {
489                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
490            }
491        }
492
493        // Then stop lifecycle services in reverse insertion order (LIFO)
494        // Continue stopping all services even if some fail
495        let mut first_error = None;
496        for service in self.services.iter_mut().rev() {
497            info!("Stopping service: {}", service.name());
498            if let Err(e) = service.stop().await {
499                warn!("Service {} failed to stop: {}", service.name(), e);
500                if first_error.is_none() {
501                    first_error = Some(e);
502                }
503            }
504        }
505
506        info!("CamelContext stopped");
507
508        if let Some(e) = first_error {
509            Err(e)
510        } else {
511            Ok(())
512        }
513    }
514
515    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
516    pub fn shutdown_timeout(&self) -> std::time::Duration {
517        self.shutdown_timeout
518    }
519
520    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
521    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
522        self.shutdown_timeout = timeout;
523    }
524
525    /// Immediate abort — kills all tasks without draining.
526    pub async fn abort(&mut self) {
527        self.cancel_token.cancel();
528        if let Some(join) = self.supervision_join.take() {
529            join.abort();
530        }
531        let route_ids = self
532            .route_controller
533            .shutdown_route_ids()
534            .await
535            .unwrap_or_default();
536        for route_id in route_ids {
537            let _ = self
538                .runtime
539                .execute(camel_api::RuntimeCommand::StopRoute {
540                    route_id: route_id.clone(),
541                    command_id: Self::next_context_command_id("abort-stop", &route_id),
542                    causation_id: None,
543                })
544                .await;
545        }
546
547        for service in self.services.iter_mut().rev() {
548            let name = service.name().to_string();
549            match timeout(std::time::Duration::from_secs(5), service.stop()).await {
550                Ok(Ok(())) => info!("Aborted service: {}", name),
551                Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
552                Err(_) => warn!("Service {} timed out during abort (5s)", name),
553            }
554        }
555    }
556
557    /// Check health status of all registered services.
558    pub fn health_check(&self) -> HealthReport {
559        let services: Vec<ServiceHealth> = self
560            .services
561            .iter()
562            .map(|s| ServiceHealth {
563                name: s.name().to_string(),
564                status: s.status(),
565            })
566            .collect();
567
568        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
569            HealthStatus::Healthy
570        } else {
571            HealthStatus::Unhealthy
572        };
573
574        HealthReport {
575            status,
576            services,
577            ..Default::default()
578        }
579    }
580
581    /// Store a component config. Overwrites any previously stored config of the same type.
582    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
583        self.component_configs
584            .insert(TypeId::of::<T>(), Box::new(config));
585    }
586
587    /// Retrieve a stored component config by type. Returns None if not stored.
588    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
589        self.component_configs
590            .get(&TypeId::of::<T>())
591            .and_then(|b| b.downcast_ref::<T>())
592    }
593}
594
595impl ComponentRegistrar for CamelContext {
596    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
597        self.registry
598            .lock()
599            .expect("mutex poisoned: another thread panicked while holding this lock")
600            .register(component);
601    }
602}
603
604impl ComponentContext for CamelContext {
605    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
606        self.registry.lock().ok()?.get(scheme)
607    }
608
609    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
610        self.languages.lock().ok()?.get(name).cloned()
611    }
612
613    fn metrics(&self) -> Arc<dyn MetricsCollector> {
614        Arc::clone(&self.metrics)
615    }
616
617    fn platform_service(&self) -> Arc<dyn PlatformService> {
618        Arc::clone(&self.platform_service)
619    }
620}
621
622impl CamelContextBuilder {
623    pub fn new() -> Self {
624        Self {
625            registry: None,
626            languages: None,
627            metrics: None,
628            platform_service: None,
629            supervision_config: None,
630            runtime_store: None,
631            shutdown_timeout: std::time::Duration::from_secs(30),
632        }
633    }
634
635    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
636        self.registry = Some(registry);
637        self
638    }
639
640    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
641        self.languages = Some(languages);
642        self
643    }
644
645    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
646        self.metrics = Some(metrics);
647        self
648    }
649
650    /// Set a custom platform service.
651    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
652        self.platform_service = Some(platform_service);
653        self
654    }
655
656    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
657        self.supervision_config = Some(config);
658        self
659    }
660
661    pub fn runtime_store(
662        mut self,
663        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
664    ) -> Self {
665        self.runtime_store = Some(store);
666        self
667    }
668
669    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
670        self.shutdown_timeout = timeout;
671        self
672    }
673
674    pub async fn build(self) -> Result<CamelContext, CamelError> {
675        let registry = self
676            .registry
677            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
678        let languages = self
679            .languages
680            .unwrap_or_else(CamelContext::built_in_languages);
681        let simple_with_resolver: Arc<dyn Language> = Arc::new(
682            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
683                let languages = Arc::clone(&languages);
684                move |name| {
685                    languages
686                        .lock()
687                        .ok()
688                        .and_then(|registry| registry.get(name).cloned())
689                }
690            })),
691        );
692        languages
693            .lock()
694            .expect("mutex poisoned: another thread panicked while holding this lock")
695            .insert("simple".to_string(), simple_with_resolver);
696        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
697        let platform_service = self
698            .platform_service
699            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
700
701        let (controller, actor_join, supervision_join) =
702            if let Some(config) = self.supervision_config {
703                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
704                let mut controller_impl = DefaultRouteController::with_languages(
705                    Arc::clone(&registry),
706                    Arc::clone(&languages),
707                    Arc::clone(&platform_service),
708                );
709                controller_impl.set_crash_notifier(crash_tx);
710                let (controller, actor_join) = spawn_controller_actor(controller_impl);
711                let supervision_join = spawn_supervision_task(
712                    controller.clone(),
713                    config,
714                    Some(Arc::clone(&metrics)),
715                    crash_rx,
716                );
717                (controller, actor_join, Some(supervision_join))
718            } else {
719                let controller_impl = DefaultRouteController::with_languages(
720                    Arc::clone(&registry),
721                    Arc::clone(&languages),
722                    Arc::clone(&platform_service),
723                );
724                let (controller, actor_join) = spawn_controller_actor(controller_impl);
725                (controller, actor_join, None)
726            };
727
728        let store = self.runtime_store.unwrap_or_default();
729        let runtime = CamelContext::build_runtime(controller.clone(), store);
730        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
731        controller
732            .try_set_runtime_handle(runtime_handle)
733            .expect("controller actor mailbox should accept initial runtime handle");
734
735        Ok(CamelContext {
736            registry,
737            route_controller: controller,
738            _actor_join: actor_join,
739            supervision_join,
740            runtime,
741            cancel_token: CancellationToken::new(),
742            metrics,
743            platform_service,
744            languages,
745            shutdown_timeout: self.shutdown_timeout,
746            services: Vec::new(),
747            component_configs: HashMap::new(),
748        })
749    }
750}
751
752impl Default for CamelContextBuilder {
753    fn default() -> Self {
754        Self::new()
755    }
756}
757
758#[cfg(test)]
759#[path = "context_tests.rs"]
760mod context_tests;