1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, FunctionInvoker, HealthReport, HealthStatus, Lifecycle, MetricsCollector,
12 NoOpMetrics, NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate,
13 RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23 DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::lifecycle::ports::RuntimeExecutionPort;
29use crate::shared::components::domain::Registry;
30use crate::shared::observability::domain::TracerConfig;
31
32static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
33
34type ExecutionFactory =
35 Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
36
37pub struct CamelContextBuilder {
38 registry: Option<Arc<std::sync::Mutex<Registry>>>,
39 languages: Option<SharedLanguageRegistry>,
40 metrics: Option<Arc<dyn MetricsCollector>>,
41 platform_service: Option<Arc<dyn PlatformService>>,
43 supervision_config: Option<SupervisionConfig>,
44 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
45 shutdown_timeout: std::time::Duration,
46 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
47 function_invoker: Option<Arc<dyn FunctionInvoker>>,
48 lifecycle_services: Vec<Box<dyn Lifecycle>>,
49 execution_factory: Option<ExecutionFactory>,
50}
51
52pub struct CamelContext {
60 registry: Arc<std::sync::Mutex<Registry>>,
61 route_controller: RouteControllerHandle,
62 _actor_join: tokio::task::JoinHandle<()>,
63 supervision_join: Option<tokio::task::JoinHandle<()>>,
64 runtime: Arc<RuntimeBus>,
65 cancel_token: CancellationToken,
66 metrics: Arc<dyn MetricsCollector>,
67 platform_service: Arc<dyn PlatformService>,
69 languages: SharedLanguageRegistry,
70 shutdown_timeout: std::time::Duration,
71 services: Vec<Box<dyn Lifecycle>>,
72 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
73 function_invoker: Option<Arc<dyn FunctionInvoker>>,
74}
75
76#[derive(Clone)]
80pub struct RuntimeExecutionHandle {
81 controller: RouteControllerHandle,
82 runtime: Arc<RuntimeBus>,
83 function_invoker: Option<Arc<dyn FunctionInvoker>>,
84}
85
86impl RuntimeExecutionHandle {
87 pub(crate) async fn add_route_definition(
88 &self,
89 definition: RouteDefinition,
90 ) -> Result<(), CamelError> {
91 use crate::lifecycle::ports::RouteRegistrationPort;
92 self.runtime
93 .register_route(definition)
94 .await
95 .map_err(Into::into)
96 }
97
98 pub(crate) async fn compile_route_definition(
99 &self,
100 definition: RouteDefinition,
101 ) -> Result<camel_api::BoxProcessor, CamelError> {
102 self.controller.compile_route_definition(definition).await
103 }
104
105 pub(crate) async fn compile_route_definition_with_generation(
106 &self,
107 definition: RouteDefinition,
108 generation: u64,
109 ) -> Result<camel_api::BoxProcessor, CamelError> {
110 self.controller
111 .compile_route_definition_with_generation(definition, generation)
112 .await
113 }
114
115 pub(crate) async fn prepare_route_definition_with_generation(
116 &self,
117 definition: RouteDefinition,
118 generation: u64,
119 ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
120 self.controller
121 .prepare_route_definition_with_generation(definition, generation)
122 .await
123 }
124
125 pub(crate) async fn insert_prepared_route(
126 &self,
127 prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
128 ) -> Result<(), CamelError> {
129 self.controller.insert_prepared_route(prepared).await
130 }
131
132 pub(crate) async fn remove_route_preserving_functions(
133 &self,
134 route_id: String,
135 ) -> Result<(), CamelError> {
136 self.controller
137 .remove_route_preserving_functions(route_id)
138 .await
139 }
140
141 pub(crate) async fn register_route_aggregate(
142 &self,
143 route_id: String,
144 ) -> Result<(), CamelError> {
145 self.runtime.register_aggregate_only(route_id).await
146 }
147
148 pub(crate) async fn swap_route_pipeline(
149 &self,
150 route_id: &str,
151 pipeline: camel_api::BoxProcessor,
152 ) -> Result<(), CamelError> {
153 self.controller.swap_pipeline(route_id, pipeline).await
154 }
155
156 pub(crate) async fn execute_runtime_command(
157 &self,
158 cmd: camel_api::RuntimeCommand,
159 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
160 self.runtime.execute(cmd).await
161 }
162
163 pub(crate) async fn runtime_route_status(
164 &self,
165 route_id: &str,
166 ) -> Result<Option<String>, CamelError> {
167 match self
168 .runtime
169 .ask(camel_api::RuntimeQuery::GetRouteStatus {
170 route_id: route_id.to_string(),
171 })
172 .await
173 {
174 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
175 Ok(_) => Err(CamelError::RouteError(
176 "unexpected runtime query response for route status".to_string(),
177 )),
178 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
179 Err(err) => Err(err),
180 }
181 }
182
183 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
184 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
185 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
186 Ok(_) => Err(CamelError::RouteError(
187 "unexpected runtime query response for route listing".to_string(),
188 )),
189 Err(err) => Err(err),
190 }
191 }
192
193 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
194 self.controller.route_source_hash(route_id).await
195 }
196
197 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
198 if !self.controller.route_exists(route_id).await? {
199 return Err(CamelError::RouteError(format!(
200 "Route '{}' not found",
201 route_id
202 )));
203 }
204 Ok(self
205 .controller
206 .in_flight_count(route_id)
207 .await?
208 .unwrap_or(0))
209 }
210
211 pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
212 self.function_invoker.clone()
213 }
214
215 #[cfg(test)]
216 pub(crate) async fn force_start_route_for_test(
217 &self,
218 route_id: &str,
219 ) -> Result<(), CamelError> {
220 self.controller.start_route(route_id).await
221 }
222
223 pub async fn controller_route_count_for_test(&self) -> usize {
224 self.controller.route_count().await.unwrap_or(0)
225 }
226}
227
228impl CamelContext {
229 fn built_in_languages() -> SharedLanguageRegistry {
230 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
231 languages.insert(
232 "simple".to_string(),
233 Arc::new(camel_language_simple::SimpleLanguage::new()),
234 );
235 #[cfg(feature = "lang-js")]
236 {
237 let js_lang = camel_language_js::JsLanguage::new();
238 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
239 languages.insert("javascript".to_string(), Arc::new(js_lang));
240 }
241 #[cfg(feature = "lang-rhai")]
242 {
243 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
244 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
245 }
246 #[cfg(feature = "lang-jsonpath")]
247 {
248 languages.insert(
249 "jsonpath".to_string(),
250 Arc::new(camel_language_jsonpath::JsonPathLanguage),
251 );
252 }
253 #[cfg(feature = "lang-xpath")]
254 {
255 languages.insert(
256 "xpath".to_string(),
257 Arc::new(camel_language_xpath::XPathLanguage),
258 );
259 }
260 Arc::new(std::sync::Mutex::new(languages))
261 }
262
263 fn build_runtime(
264 controller: RouteControllerHandle,
265 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
266 execution_factory: Option<ExecutionFactory>,
267 ) -> Arc<RuntimeBus> {
268 let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
269 factory(controller.clone())
270 } else {
271 Arc::new(RuntimeExecutionAdapter::new(controller))
272 };
273 Arc::new(
274 RuntimeBus::new(
275 Arc::new(store.clone()),
276 Arc::new(store.clone()),
277 Arc::new(store.clone()),
278 Arc::new(store.clone()),
279 )
280 .with_uow(Arc::new(store))
281 .with_execution(execution),
282 )
283 }
284
285 pub fn builder() -> CamelContextBuilder {
286 CamelContextBuilder::new()
287 }
288
289 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
291 let _ = self.route_controller.set_error_handler(config).await;
292 }
293
294 pub async fn set_tracing(&mut self, enabled: bool) {
296 let _ = self
297 .route_controller
298 .set_tracer_config(TracerConfig {
299 enabled,
300 ..Default::default()
301 })
302 .await;
303 }
304
305 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
307 let config = if config.metrics_collector.is_none() {
309 TracerConfig {
310 metrics_collector: Some(Arc::clone(&self.metrics)),
311 ..config
312 }
313 } else {
314 config
315 };
316
317 let _ = self.route_controller.set_tracer_config(config).await;
318 }
319
320 pub async fn with_tracing(mut self) -> Self {
322 self.set_tracing(true).await;
323 self
324 }
325
326 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
330 self.set_tracer_config(config).await;
331 self
332 }
333
334 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
343 if let Some(collector) = service.as_metrics_collector() {
344 self.metrics = collector;
345 }
346 if let Some(invoker) = service.as_function_invoker() {
347 self.function_invoker = Some(invoker.clone());
348 if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
349 tracing::warn!("Failed to propagate function invoker to route controller: {e}");
350 }
351 }
352
353 self.services.push(Box::new(service));
354 self
355 }
356
357 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
359 info!(scheme = component.scheme(), "Registering component");
360 self.registry
361 .lock()
362 .expect("mutex poisoned: another thread panicked while holding this lock") .register(Arc::new(component));
364 }
365
366 pub fn register_language(
373 &mut self,
374 name: impl Into<String>,
375 lang: Box<dyn Language>,
376 ) -> Result<(), LanguageRegistryError> {
377 let name = name.into();
378 let mut languages = self
379 .languages
380 .lock()
381 .expect("mutex poisoned: another thread panicked while holding this lock"); if languages.contains_key(&name) {
383 return Err(LanguageRegistryError::AlreadyRegistered { name });
384 }
385 languages.insert(name, Arc::from(lang));
386 Ok(())
387 }
388
389 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
391 let languages = self
392 .languages
393 .lock()
394 .expect("mutex poisoned: another thread panicked while holding this lock"); languages.get(name).cloned()
396 }
397
398 pub async fn add_route_definition(
402 &self,
403 definition: RouteDefinition,
404 ) -> Result<(), CamelError> {
405 use crate::lifecycle::ports::RouteRegistrationPort;
406 info!(
407 from = definition.from_uri(),
408 route_id = %definition.route_id(),
409 "Adding route definition"
410 );
411 self.runtime
412 .register_route(definition)
413 .await
414 .map_err(Into::into)
415 }
416
417 fn next_context_command_id(op: &str, route_id: &str) -> String {
418 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
419 format!("context:{op}:{route_id}:{seq}")
420 }
421
422 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
424 self.registry
425 .lock()
426 .expect("mutex poisoned: another thread panicked while holding this lock") }
428
429 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
431 Arc::clone(&self.registry)
432 }
433
434 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
436 RuntimeExecutionHandle {
437 controller: self.route_controller.clone(),
438 runtime: Arc::clone(&self.runtime),
439 function_invoker: self.function_invoker.clone(),
440 }
441 }
442
443 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
445 Arc::clone(&self.metrics)
446 }
447
448 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
450 Arc::clone(&self.platform_service)
451 }
452
453 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
455 self.platform_service.readiness_gate()
456 }
457
458 pub fn platform_identity(&self) -> PlatformIdentity {
460 self.platform_service.identity()
461 }
462
463 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
465 self.platform_service.leadership()
466 }
467
468 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
470 self.runtime.clone()
471 }
472
473 pub fn producer_context(&self) -> camel_api::ProducerContext {
475 camel_api::ProducerContext::new().with_runtime(self.runtime())
476 }
477
478 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
480 match self
481 .runtime()
482 .ask(camel_api::RuntimeQuery::GetRouteStatus {
483 route_id: route_id.to_string(),
484 })
485 .await
486 {
487 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
488 Ok(_) => Err(CamelError::RouteError(
489 "unexpected runtime query response for route status".to_string(),
490 )),
491 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
492 Err(err) => Err(err),
493 }
494 }
495
496 pub async fn start(&mut self) -> Result<(), CamelError> {
501 info!("Starting CamelContext");
502
503 for (i, service) in self.services.iter_mut().enumerate() {
505 info!("Starting service: {}", service.name());
506 if let Err(e) = service.start().await {
507 warn!(
509 "Service {} failed to start, rolling back {} services",
510 service.name(),
511 i
512 );
513 for j in (0..i).rev() {
514 if let Err(rollback_err) = self.services[j].stop().await {
515 warn!(
516 "Failed to stop service {} during rollback: {}",
517 self.services[j].name(),
518 rollback_err
519 );
520 }
521 }
522 return Err(e);
523 }
524 }
525
526 let route_ids = self.route_controller.auto_startup_route_ids().await?;
529 for route_id in route_ids {
530 self.runtime
531 .execute(camel_api::RuntimeCommand::StartRoute {
532 route_id: route_id.clone(),
533 command_id: Self::next_context_command_id("start", &route_id),
534 causation_id: None,
535 })
536 .await?;
537 }
538
539 info!("CamelContext started");
540 Ok(())
541 }
542
543 pub async fn stop(&mut self) -> Result<(), CamelError> {
545 self.stop_timeout(self.shutdown_timeout).await
546 }
547
548 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
553 info!("Stopping CamelContext");
554
555 self.cancel_token.cancel();
557 if let Some(join) = self.supervision_join.take() {
558 join.abort();
559 }
560
561 let route_ids = self.route_controller.shutdown_route_ids().await?;
564 for route_id in route_ids {
565 if let Err(err) = self
566 .runtime
567 .execute(camel_api::RuntimeCommand::StopRoute {
568 route_id: route_id.clone(),
569 command_id: Self::next_context_command_id("stop", &route_id),
570 causation_id: None,
571 })
572 .await
573 {
574 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
575 }
576 }
577
578 let mut first_error = None;
581 for service in self.services.iter_mut().rev() {
582 info!("Stopping service: {}", service.name());
583 if let Err(e) = service.stop().await {
584 warn!("Service {} failed to stop: {}", service.name(), e);
585 if first_error.is_none() {
586 first_error = Some(e);
587 }
588 }
589 }
590
591 info!("CamelContext stopped");
592
593 if let Some(e) = first_error {
594 Err(e)
595 } else {
596 Ok(())
597 }
598 }
599
600 pub fn shutdown_timeout(&self) -> std::time::Duration {
602 self.shutdown_timeout
603 }
604
605 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
607 self.shutdown_timeout = timeout;
608 }
609
610 pub async fn abort(&mut self) {
612 self.cancel_token.cancel();
613 if let Some(join) = self.supervision_join.take() {
614 join.abort();
615 }
616 let route_ids = self
617 .route_controller
618 .shutdown_route_ids()
619 .await
620 .unwrap_or_default();
621 for route_id in route_ids {
622 let _ = self
623 .runtime
624 .execute(camel_api::RuntimeCommand::StopRoute {
625 route_id: route_id.clone(),
626 command_id: Self::next_context_command_id("abort-stop", &route_id),
627 causation_id: None,
628 })
629 .await;
630 }
631
632 for service in self.services.iter_mut().rev() {
633 let name = service.name().to_string();
634 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
635 Ok(Ok(())) => info!("Aborted service: {}", name),
636 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
637 Err(_) => warn!("Service {} timed out during abort (5s)", name),
638 }
639 }
640 }
641
642 pub fn health_check(&self) -> HealthReport {
644 let services: Vec<ServiceHealth> = self
645 .services
646 .iter()
647 .map(|s| ServiceHealth {
648 name: s.name().to_string(),
649 status: s.status(),
650 })
651 .collect();
652
653 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
654 HealthStatus::Healthy
655 } else {
656 HealthStatus::Unhealthy
657 };
658
659 HealthReport {
660 status,
661 services,
662 ..Default::default()
663 }
664 }
665
666 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
668 self.component_configs
669 .insert(TypeId::of::<T>(), Box::new(config));
670 }
671
672 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
674 self.component_configs
675 .get(&TypeId::of::<T>())
676 .and_then(|b| b.downcast_ref::<T>())
677 }
678}
679
680impl ComponentRegistrar for CamelContext {
681 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
682 self.registry
683 .lock()
684 .expect("mutex poisoned: another thread panicked while holding this lock") .register(component);
686 }
687}
688
689impl ComponentContext for CamelContext {
690 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
691 self.registry.lock().ok()?.get(scheme)
692 }
693
694 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
695 self.languages.lock().ok()?.get(name).cloned()
696 }
697
698 fn metrics(&self) -> Arc<dyn MetricsCollector> {
699 Arc::clone(&self.metrics)
700 }
701
702 fn platform_service(&self) -> Arc<dyn PlatformService> {
703 Arc::clone(&self.platform_service)
704 }
705}
706
707impl CamelContextBuilder {
708 pub fn new() -> Self {
709 Self {
710 registry: None,
711 languages: None,
712 metrics: None,
713 platform_service: None,
714 supervision_config: None,
715 runtime_store: None,
716 shutdown_timeout: std::time::Duration::from_secs(30),
717 beans: None,
718 function_invoker: None,
719 lifecycle_services: Vec::new(),
720 execution_factory: None,
721 }
722 }
723
724 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
725 self.registry = Some(registry);
726 self
727 }
728
729 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
730 self.languages = Some(languages);
731 self
732 }
733
734 pub fn with_execution_factory(
735 mut self,
736 factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
737 ) -> Self {
738 self.execution_factory = Some(Arc::new(factory));
739 self
740 }
741
742 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
743 self.metrics = Some(metrics);
744 self
745 }
746
747 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
749 self.platform_service = Some(platform_service);
750 self
751 }
752
753 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
754 self.supervision_config = Some(config);
755 self
756 }
757
758 pub fn runtime_store(
759 mut self,
760 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
761 ) -> Self {
762 self.runtime_store = Some(store);
763 self
764 }
765
766 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
767 self.shutdown_timeout = timeout;
768 self
769 }
770
771 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
773 self.beans = Some(beans);
774 self
775 }
776
777 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
783 if let Some(collector) = service.as_metrics_collector() {
784 self.metrics = Some(collector);
785 }
786 if let Some(invoker) = service.as_function_invoker() {
787 self.function_invoker = Some(invoker);
788 }
789 self.lifecycle_services.push(Box::new(service));
790 self
791 }
792
793 pub async fn build(self) -> Result<CamelContext, CamelError> {
794 let registry = self
795 .registry
796 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
797 let languages = self
798 .languages
799 .unwrap_or_else(CamelContext::built_in_languages);
800 let simple_with_resolver: Arc<dyn Language> = Arc::new(
801 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
802 let languages = Arc::clone(&languages);
803 move |name| {
804 languages
805 .lock()
806 .ok()
807 .and_then(|registry| registry.get(name).cloned())
808 }
809 })),
810 );
811 languages
812 .lock()
813 .expect("mutex poisoned: another thread panicked while holding this lock") .insert("simple".to_string(), simple_with_resolver);
815 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
816 let platform_service = self
817 .platform_service
818 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
819
820 let (controller, actor_join, supervision_join) =
821 if let Some(config) = self.supervision_config {
822 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
823 let mut controller_impl = if let Some(ref beans) = self.beans {
824 DefaultRouteController::with_languages_and_beans(
825 Arc::clone(®istry),
826 Arc::clone(&languages),
827 Arc::clone(&platform_service),
828 Arc::clone(beans),
829 )
830 } else {
831 DefaultRouteController::with_languages(
832 Arc::clone(®istry),
833 Arc::clone(&languages),
834 Arc::clone(&platform_service),
835 )
836 };
837 if let Some(invoker) = self.function_invoker.clone() {
838 controller_impl = controller_impl.with_function_invoker(invoker);
839 }
840 controller_impl.set_crash_notifier(crash_tx);
841 let (controller, actor_join) = spawn_controller_actor(controller_impl);
842 let supervision_join = spawn_supervision_task(
843 controller.clone(),
844 config,
845 Some(Arc::clone(&metrics)),
846 crash_rx,
847 );
848 (controller, actor_join, Some(supervision_join))
849 } else {
850 let mut controller_impl = if let Some(ref beans) = self.beans {
851 DefaultRouteController::with_languages_and_beans(
852 Arc::clone(®istry),
853 Arc::clone(&languages),
854 Arc::clone(&platform_service),
855 Arc::clone(beans),
856 )
857 } else {
858 DefaultRouteController::with_languages(
859 Arc::clone(®istry),
860 Arc::clone(&languages),
861 Arc::clone(&platform_service),
862 )
863 };
864 if let Some(invoker) = self.function_invoker.clone() {
865 controller_impl = controller_impl.with_function_invoker(invoker);
866 }
867 let (controller, actor_join) = spawn_controller_actor(controller_impl);
868 (controller, actor_join, None)
869 };
870
871 let store = self.runtime_store.unwrap_or_default();
872 let runtime =
873 CamelContext::build_runtime(controller.clone(), store, self.execution_factory);
874 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
875 controller
876 .try_set_runtime_handle(runtime_handle)
877 .expect("controller actor mailbox should accept initial runtime handle"); Ok(CamelContext {
880 registry,
881 route_controller: controller,
882 _actor_join: actor_join,
883 supervision_join,
884 runtime,
885 cancel_token: CancellationToken::new(),
886 metrics,
887 platform_service,
888 languages,
889 shutdown_timeout: self.shutdown_timeout,
890 services: self.lifecycle_services,
891 component_configs: HashMap::new(),
892 function_invoker: self.function_invoker,
893 })
894 }
895}
896
897impl Default for CamelContextBuilder {
898 fn default() -> Self {
899 Self::new()
900 }
901}
902
903#[cfg(test)]
904#[path = "context_tests.rs"]
905mod context_tests;