1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, FunctionInvoker, HealthReport, HealthStatus, Lifecycle, MetricsCollector,
12 NoOpMetrics, NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate,
13 RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23 DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34 registry: Option<Arc<std::sync::Mutex<Registry>>>,
35 languages: Option<SharedLanguageRegistry>,
36 metrics: Option<Arc<dyn MetricsCollector>>,
37 platform_service: Option<Arc<dyn PlatformService>>,
39 supervision_config: Option<SupervisionConfig>,
40 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41 shutdown_timeout: std::time::Duration,
42 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
43 function_invoker: Option<Arc<dyn FunctionInvoker>>,
44 lifecycle_services: Vec<Box<dyn Lifecycle>>,
45}
46
47pub struct CamelContext {
55 registry: Arc<std::sync::Mutex<Registry>>,
56 route_controller: RouteControllerHandle,
57 _actor_join: tokio::task::JoinHandle<()>,
58 supervision_join: Option<tokio::task::JoinHandle<()>>,
59 runtime: Arc<RuntimeBus>,
60 cancel_token: CancellationToken,
61 metrics: Arc<dyn MetricsCollector>,
62 platform_service: Arc<dyn PlatformService>,
64 languages: SharedLanguageRegistry,
65 shutdown_timeout: std::time::Duration,
66 services: Vec<Box<dyn Lifecycle>>,
67 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68 function_invoker: Option<Arc<dyn FunctionInvoker>>,
69}
70
71#[derive(Clone)]
75pub struct RuntimeExecutionHandle {
76 controller: RouteControllerHandle,
77 runtime: Arc<RuntimeBus>,
78 function_invoker: Option<Arc<dyn FunctionInvoker>>,
79}
80
81impl RuntimeExecutionHandle {
82 pub(crate) async fn add_route_definition(
83 &self,
84 definition: RouteDefinition,
85 ) -> Result<(), CamelError> {
86 use crate::lifecycle::ports::RouteRegistrationPort;
87 self.runtime.register_route(definition).await
88 }
89
90 pub(crate) async fn compile_route_definition(
91 &self,
92 definition: RouteDefinition,
93 ) -> Result<camel_api::BoxProcessor, CamelError> {
94 self.controller.compile_route_definition(definition).await
95 }
96
97 pub(crate) async fn compile_route_definition_with_generation(
98 &self,
99 definition: RouteDefinition,
100 generation: u64,
101 ) -> Result<camel_api::BoxProcessor, CamelError> {
102 self.controller
103 .compile_route_definition_with_generation(definition, generation)
104 .await
105 }
106
107 pub(crate) async fn prepare_route_definition_with_generation(
108 &self,
109 definition: RouteDefinition,
110 generation: u64,
111 ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
112 self.controller
113 .prepare_route_definition_with_generation(definition, generation)
114 .await
115 }
116
117 pub(crate) async fn insert_prepared_route(
118 &self,
119 prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
120 ) -> Result<(), CamelError> {
121 self.controller.insert_prepared_route(prepared).await
122 }
123
124 pub(crate) async fn remove_route_preserving_functions(
125 &self,
126 route_id: String,
127 ) -> Result<(), CamelError> {
128 self.controller
129 .remove_route_preserving_functions(route_id)
130 .await
131 }
132
133 pub(crate) async fn register_route_aggregate(
134 &self,
135 route_id: String,
136 ) -> Result<(), CamelError> {
137 self.runtime.register_aggregate_only(route_id).await
138 }
139
140 pub(crate) async fn swap_route_pipeline(
141 &self,
142 route_id: &str,
143 pipeline: camel_api::BoxProcessor,
144 ) -> Result<(), CamelError> {
145 self.controller.swap_pipeline(route_id, pipeline).await
146 }
147
148 pub(crate) async fn execute_runtime_command(
149 &self,
150 cmd: camel_api::RuntimeCommand,
151 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
152 self.runtime.execute(cmd).await
153 }
154
155 pub(crate) async fn runtime_route_status(
156 &self,
157 route_id: &str,
158 ) -> Result<Option<String>, CamelError> {
159 match self
160 .runtime
161 .ask(camel_api::RuntimeQuery::GetRouteStatus {
162 route_id: route_id.to_string(),
163 })
164 .await
165 {
166 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
167 Ok(_) => Err(CamelError::RouteError(
168 "unexpected runtime query response for route status".to_string(),
169 )),
170 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
171 Err(err) => Err(err),
172 }
173 }
174
175 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
176 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
177 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
178 Ok(_) => Err(CamelError::RouteError(
179 "unexpected runtime query response for route listing".to_string(),
180 )),
181 Err(err) => Err(err),
182 }
183 }
184
185 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
186 self.controller.route_source_hash(route_id).await
187 }
188
189 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
190 if !self.controller.route_exists(route_id).await? {
191 return Err(CamelError::RouteError(format!(
192 "Route '{}' not found",
193 route_id
194 )));
195 }
196 Ok(self
197 .controller
198 .in_flight_count(route_id)
199 .await?
200 .unwrap_or(0))
201 }
202
203 pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
204 self.function_invoker.clone()
205 }
206
207 #[cfg(test)]
208 pub(crate) async fn force_start_route_for_test(
209 &self,
210 route_id: &str,
211 ) -> Result<(), CamelError> {
212 self.controller.start_route(route_id).await
213 }
214
215 pub async fn controller_route_count_for_test(&self) -> usize {
216 self.controller.route_count().await.unwrap_or(0)
217 }
218}
219
220impl CamelContext {
221 fn built_in_languages() -> SharedLanguageRegistry {
222 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
223 languages.insert(
224 "simple".to_string(),
225 Arc::new(camel_language_simple::SimpleLanguage::new()),
226 );
227 #[cfg(feature = "lang-js")]
228 {
229 let js_lang = camel_language_js::JsLanguage::new();
230 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
231 languages.insert("javascript".to_string(), Arc::new(js_lang));
232 }
233 #[cfg(feature = "lang-rhai")]
234 {
235 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
236 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
237 }
238 #[cfg(feature = "lang-jsonpath")]
239 {
240 languages.insert(
241 "jsonpath".to_string(),
242 Arc::new(camel_language_jsonpath::JsonPathLanguage),
243 );
244 }
245 #[cfg(feature = "lang-xpath")]
246 {
247 languages.insert(
248 "xpath".to_string(),
249 Arc::new(camel_language_xpath::XPathLanguage),
250 );
251 }
252 Arc::new(std::sync::Mutex::new(languages))
253 }
254
255 fn build_runtime(
256 controller: RouteControllerHandle,
257 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
258 ) -> Arc<RuntimeBus> {
259 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
260 Arc::new(
261 RuntimeBus::new(
262 Arc::new(store.clone()),
263 Arc::new(store.clone()),
264 Arc::new(store.clone()),
265 Arc::new(store.clone()),
266 )
267 .with_uow(Arc::new(store))
268 .with_execution(execution),
269 )
270 }
271
272 pub fn builder() -> CamelContextBuilder {
273 CamelContextBuilder::new()
274 }
275
276 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
278 let _ = self.route_controller.set_error_handler(config).await;
279 }
280
281 pub async fn set_tracing(&mut self, enabled: bool) {
283 let _ = self
284 .route_controller
285 .set_tracer_config(TracerConfig {
286 enabled,
287 ..Default::default()
288 })
289 .await;
290 }
291
292 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
294 let config = if config.metrics_collector.is_none() {
296 TracerConfig {
297 metrics_collector: Some(Arc::clone(&self.metrics)),
298 ..config
299 }
300 } else {
301 config
302 };
303
304 let _ = self.route_controller.set_tracer_config(config).await;
305 }
306
307 pub async fn with_tracing(mut self) -> Self {
309 self.set_tracing(true).await;
310 self
311 }
312
313 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
317 self.set_tracer_config(config).await;
318 self
319 }
320
321 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
330 if let Some(collector) = service.as_metrics_collector() {
331 self.metrics = collector;
332 }
333 if let Some(invoker) = service.as_function_invoker() {
334 self.function_invoker = Some(invoker.clone());
335 if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
336 tracing::warn!("Failed to propagate function invoker to route controller: {e}");
337 }
338 }
339
340 self.services.push(Box::new(service));
341 self
342 }
343
344 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
346 info!(scheme = component.scheme(), "Registering component");
347 self.registry
348 .lock()
349 .expect("mutex poisoned: another thread panicked while holding this lock")
350 .register(Arc::new(component));
351 }
352
353 pub fn register_language(
360 &mut self,
361 name: impl Into<String>,
362 lang: Box<dyn Language>,
363 ) -> Result<(), LanguageRegistryError> {
364 let name = name.into();
365 let mut languages = self
366 .languages
367 .lock()
368 .expect("mutex poisoned: another thread panicked while holding this lock");
369 if languages.contains_key(&name) {
370 return Err(LanguageRegistryError::AlreadyRegistered { name });
371 }
372 languages.insert(name, Arc::from(lang));
373 Ok(())
374 }
375
376 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
378 let languages = self
379 .languages
380 .lock()
381 .expect("mutex poisoned: another thread panicked while holding this lock");
382 languages.get(name).cloned()
383 }
384
385 pub async fn add_route_definition(
389 &self,
390 definition: RouteDefinition,
391 ) -> Result<(), CamelError> {
392 use crate::lifecycle::ports::RouteRegistrationPort;
393 info!(
394 from = definition.from_uri(),
395 route_id = %definition.route_id(),
396 "Adding route definition"
397 );
398 self.runtime.register_route(definition).await
399 }
400
401 fn next_context_command_id(op: &str, route_id: &str) -> String {
402 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
403 format!("context:{op}:{route_id}:{seq}")
404 }
405
406 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
408 self.registry
409 .lock()
410 .expect("mutex poisoned: another thread panicked while holding this lock")
411 }
412
413 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
415 Arc::clone(&self.registry)
416 }
417
418 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
420 RuntimeExecutionHandle {
421 controller: self.route_controller.clone(),
422 runtime: Arc::clone(&self.runtime),
423 function_invoker: self.function_invoker.clone(),
424 }
425 }
426
427 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
429 Arc::clone(&self.metrics)
430 }
431
432 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
434 Arc::clone(&self.platform_service)
435 }
436
437 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
439 self.platform_service.readiness_gate()
440 }
441
442 pub fn platform_identity(&self) -> PlatformIdentity {
444 self.platform_service.identity()
445 }
446
447 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
449 self.platform_service.leadership()
450 }
451
452 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
454 self.runtime.clone()
455 }
456
457 pub fn producer_context(&self) -> camel_api::ProducerContext {
459 camel_api::ProducerContext::new().with_runtime(self.runtime())
460 }
461
462 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
464 match self
465 .runtime()
466 .ask(camel_api::RuntimeQuery::GetRouteStatus {
467 route_id: route_id.to_string(),
468 })
469 .await
470 {
471 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
472 Ok(_) => Err(CamelError::RouteError(
473 "unexpected runtime query response for route status".to_string(),
474 )),
475 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
476 Err(err) => Err(err),
477 }
478 }
479
480 pub async fn start(&mut self) -> Result<(), CamelError> {
485 info!("Starting CamelContext");
486
487 for (i, service) in self.services.iter_mut().enumerate() {
489 info!("Starting service: {}", service.name());
490 if let Err(e) = service.start().await {
491 warn!(
493 "Service {} failed to start, rolling back {} services",
494 service.name(),
495 i
496 );
497 for j in (0..i).rev() {
498 if let Err(rollback_err) = self.services[j].stop().await {
499 warn!(
500 "Failed to stop service {} during rollback: {}",
501 self.services[j].name(),
502 rollback_err
503 );
504 }
505 }
506 return Err(e);
507 }
508 }
509
510 let route_ids = self.route_controller.auto_startup_route_ids().await?;
513 for route_id in route_ids {
514 self.runtime
515 .execute(camel_api::RuntimeCommand::StartRoute {
516 route_id: route_id.clone(),
517 command_id: Self::next_context_command_id("start", &route_id),
518 causation_id: None,
519 })
520 .await?;
521 }
522
523 info!("CamelContext started");
524 Ok(())
525 }
526
527 pub async fn stop(&mut self) -> Result<(), CamelError> {
529 self.stop_timeout(self.shutdown_timeout).await
530 }
531
532 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
537 info!("Stopping CamelContext");
538
539 self.cancel_token.cancel();
541 if let Some(join) = self.supervision_join.take() {
542 join.abort();
543 }
544
545 let route_ids = self.route_controller.shutdown_route_ids().await?;
548 for route_id in route_ids {
549 if let Err(err) = self
550 .runtime
551 .execute(camel_api::RuntimeCommand::StopRoute {
552 route_id: route_id.clone(),
553 command_id: Self::next_context_command_id("stop", &route_id),
554 causation_id: None,
555 })
556 .await
557 {
558 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
559 }
560 }
561
562 let mut first_error = None;
565 for service in self.services.iter_mut().rev() {
566 info!("Stopping service: {}", service.name());
567 if let Err(e) = service.stop().await {
568 warn!("Service {} failed to stop: {}", service.name(), e);
569 if first_error.is_none() {
570 first_error = Some(e);
571 }
572 }
573 }
574
575 info!("CamelContext stopped");
576
577 if let Some(e) = first_error {
578 Err(e)
579 } else {
580 Ok(())
581 }
582 }
583
584 pub fn shutdown_timeout(&self) -> std::time::Duration {
586 self.shutdown_timeout
587 }
588
589 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
591 self.shutdown_timeout = timeout;
592 }
593
594 pub async fn abort(&mut self) {
596 self.cancel_token.cancel();
597 if let Some(join) = self.supervision_join.take() {
598 join.abort();
599 }
600 let route_ids = self
601 .route_controller
602 .shutdown_route_ids()
603 .await
604 .unwrap_or_default();
605 for route_id in route_ids {
606 let _ = self
607 .runtime
608 .execute(camel_api::RuntimeCommand::StopRoute {
609 route_id: route_id.clone(),
610 command_id: Self::next_context_command_id("abort-stop", &route_id),
611 causation_id: None,
612 })
613 .await;
614 }
615
616 for service in self.services.iter_mut().rev() {
617 let name = service.name().to_string();
618 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
619 Ok(Ok(())) => info!("Aborted service: {}", name),
620 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
621 Err(_) => warn!("Service {} timed out during abort (5s)", name),
622 }
623 }
624 }
625
626 pub fn health_check(&self) -> HealthReport {
628 let services: Vec<ServiceHealth> = self
629 .services
630 .iter()
631 .map(|s| ServiceHealth {
632 name: s.name().to_string(),
633 status: s.status(),
634 })
635 .collect();
636
637 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
638 HealthStatus::Healthy
639 } else {
640 HealthStatus::Unhealthy
641 };
642
643 HealthReport {
644 status,
645 services,
646 ..Default::default()
647 }
648 }
649
650 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
652 self.component_configs
653 .insert(TypeId::of::<T>(), Box::new(config));
654 }
655
656 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
658 self.component_configs
659 .get(&TypeId::of::<T>())
660 .and_then(|b| b.downcast_ref::<T>())
661 }
662}
663
664impl ComponentRegistrar for CamelContext {
665 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
666 self.registry
667 .lock()
668 .expect("mutex poisoned: another thread panicked while holding this lock")
669 .register(component);
670 }
671}
672
673impl ComponentContext for CamelContext {
674 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
675 self.registry.lock().ok()?.get(scheme)
676 }
677
678 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
679 self.languages.lock().ok()?.get(name).cloned()
680 }
681
682 fn metrics(&self) -> Arc<dyn MetricsCollector> {
683 Arc::clone(&self.metrics)
684 }
685
686 fn platform_service(&self) -> Arc<dyn PlatformService> {
687 Arc::clone(&self.platform_service)
688 }
689}
690
691impl CamelContextBuilder {
692 pub fn new() -> Self {
693 Self {
694 registry: None,
695 languages: None,
696 metrics: None,
697 platform_service: None,
698 supervision_config: None,
699 runtime_store: None,
700 shutdown_timeout: std::time::Duration::from_secs(30),
701 beans: None,
702 function_invoker: None,
703 lifecycle_services: Vec::new(),
704 }
705 }
706
707 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
708 self.registry = Some(registry);
709 self
710 }
711
712 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
713 self.languages = Some(languages);
714 self
715 }
716
717 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
718 self.metrics = Some(metrics);
719 self
720 }
721
722 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
724 self.platform_service = Some(platform_service);
725 self
726 }
727
728 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
729 self.supervision_config = Some(config);
730 self
731 }
732
733 pub fn runtime_store(
734 mut self,
735 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
736 ) -> Self {
737 self.runtime_store = Some(store);
738 self
739 }
740
741 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
742 self.shutdown_timeout = timeout;
743 self
744 }
745
746 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
748 self.beans = Some(beans);
749 self
750 }
751
752 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
758 if let Some(collector) = service.as_metrics_collector() {
759 self.metrics = Some(collector);
760 }
761 if let Some(invoker) = service.as_function_invoker() {
762 self.function_invoker = Some(invoker);
763 }
764 self.lifecycle_services.push(Box::new(service));
765 self
766 }
767
768 pub async fn build(self) -> Result<CamelContext, CamelError> {
769 let registry = self
770 .registry
771 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
772 let languages = self
773 .languages
774 .unwrap_or_else(CamelContext::built_in_languages);
775 let simple_with_resolver: Arc<dyn Language> = Arc::new(
776 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
777 let languages = Arc::clone(&languages);
778 move |name| {
779 languages
780 .lock()
781 .ok()
782 .and_then(|registry| registry.get(name).cloned())
783 }
784 })),
785 );
786 languages
787 .lock()
788 .expect("mutex poisoned: another thread panicked while holding this lock")
789 .insert("simple".to_string(), simple_with_resolver);
790 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
791 let platform_service = self
792 .platform_service
793 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
794
795 let (controller, actor_join, supervision_join) =
796 if let Some(config) = self.supervision_config {
797 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
798 let mut controller_impl = if let Some(ref beans) = self.beans {
799 DefaultRouteController::with_languages_and_beans(
800 Arc::clone(®istry),
801 Arc::clone(&languages),
802 Arc::clone(&platform_service),
803 Arc::clone(beans),
804 )
805 } else {
806 DefaultRouteController::with_languages(
807 Arc::clone(®istry),
808 Arc::clone(&languages),
809 Arc::clone(&platform_service),
810 )
811 };
812 if let Some(invoker) = self.function_invoker.clone() {
813 controller_impl = controller_impl.with_function_invoker(invoker);
814 }
815 controller_impl.set_crash_notifier(crash_tx);
816 let (controller, actor_join) = spawn_controller_actor(controller_impl);
817 let supervision_join = spawn_supervision_task(
818 controller.clone(),
819 config,
820 Some(Arc::clone(&metrics)),
821 crash_rx,
822 );
823 (controller, actor_join, Some(supervision_join))
824 } else {
825 let mut controller_impl = if let Some(ref beans) = self.beans {
826 DefaultRouteController::with_languages_and_beans(
827 Arc::clone(®istry),
828 Arc::clone(&languages),
829 Arc::clone(&platform_service),
830 Arc::clone(beans),
831 )
832 } else {
833 DefaultRouteController::with_languages(
834 Arc::clone(®istry),
835 Arc::clone(&languages),
836 Arc::clone(&platform_service),
837 )
838 };
839 if let Some(invoker) = self.function_invoker.clone() {
840 controller_impl = controller_impl.with_function_invoker(invoker);
841 }
842 let (controller, actor_join) = spawn_controller_actor(controller_impl);
843 (controller, actor_join, None)
844 };
845
846 let store = self.runtime_store.unwrap_or_default();
847 let runtime = CamelContext::build_runtime(controller.clone(), store);
848 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
849 controller
850 .try_set_runtime_handle(runtime_handle)
851 .expect("controller actor mailbox should accept initial runtime handle");
852
853 Ok(CamelContext {
854 registry,
855 route_controller: controller,
856 _actor_join: actor_join,
857 supervision_join,
858 runtime,
859 cancel_token: CancellationToken::new(),
860 metrics,
861 platform_service,
862 languages,
863 shutdown_timeout: self.shutdown_timeout,
864 services: self.lifecycle_services,
865 component_configs: HashMap::new(),
866 function_invoker: self.function_invoker,
867 })
868 }
869}
870
871impl Default for CamelContextBuilder {
872 fn default() -> Self {
873 Self::new()
874 }
875}
876
877#[cfg(test)]
878#[path = "context_tests.rs"]
879mod context_tests;