use std::collections::HashMap;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use camel_api::{
CamelError, FunctionInvoker, Lifecycle, MetricsCollector, NoOpMetrics, NoopPlatformService,
PlatformService, SupervisionConfig,
};
use camel_language_api::Language;
use super::context::{CamelContext, FromParts};
use crate::health_registry::HealthCheckRegistry;
use crate::lifecycle::adapters::RuntimeExecutionAdapter;
use crate::lifecycle::adapters::controller_actor::{
RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
};
use crate::lifecycle::adapters::route_controller::{
DefaultRouteController, SharedLanguageRegistry,
};
use crate::lifecycle::application::runtime_bus::RuntimeBus;
use crate::lifecycle::ports::RuntimeExecutionPort;
use crate::shared::components::domain::Registry;
use crate::template::TemplateRegistry;
type ExecutionFactory =
Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
pub struct CamelContextBuilder {
registry: Option<Arc<std::sync::Mutex<Registry>>>,
languages: Option<SharedLanguageRegistry>,
metrics: Option<Arc<dyn MetricsCollector>>,
platform_service: Option<Arc<dyn PlatformService>>,
supervision_config: Option<SupervisionConfig>,
runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
shutdown_timeout: std::time::Duration,
beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
function_invoker: Option<Arc<dyn FunctionInvoker>>,
lifecycle_services: Vec<Box<dyn Lifecycle>>,
execution_factory: Option<ExecutionFactory>,
health_registry: Option<Arc<HealthCheckRegistry>>,
template_registry: Option<Arc<TemplateRegistry>>,
}
impl CamelContextBuilder {
pub fn new() -> Self {
Self {
registry: None,
languages: None,
metrics: None,
platform_service: None,
supervision_config: None,
runtime_store: None,
shutdown_timeout: std::time::Duration::from_secs(5),
beans: None,
function_invoker: None,
lifecycle_services: Vec::new(),
execution_factory: None,
health_registry: None,
template_registry: None,
}
}
pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
self.registry = Some(registry);
self
}
pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
self.languages = Some(languages);
self
}
pub fn with_execution_factory(
mut self,
factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
) -> Self {
self.execution_factory = Some(Arc::new(factory));
self
}
pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
self.metrics = Some(metrics);
self
}
pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
self.platform_service = Some(platform_service);
self
}
pub fn supervision(mut self, config: SupervisionConfig) -> Self {
self.supervision_config = Some(config);
self
}
pub fn runtime_store(
mut self,
store: crate::lifecycle::adapters::InMemoryRuntimeStore,
) -> Self {
self.runtime_store = Some(store);
self
}
pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
self.shutdown_timeout = timeout;
self
}
pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
self.health_registry = Some(registry);
self
}
pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
self.beans = Some(beans);
self
}
pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
if let Some(collector) = service.as_metrics_collector() {
self.metrics = Some(collector);
}
if let Some(invoker) = service.as_function_invoker() {
self.function_invoker = Some(invoker);
}
self.lifecycle_services.push(Box::new(service));
self
}
pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
self.template_registry = Some(registry);
self
}
fn built_in_languages() -> SharedLanguageRegistry {
let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
languages.insert(
"simple".to_string(),
Arc::new(camel_language_simple::SimpleLanguage::new()),
);
#[cfg(feature = "lang-js")]
{
let js_lang = camel_language_js::JsLanguage::new();
languages.insert("js".to_string(), Arc::new(js_lang.clone()));
languages.insert("javascript".to_string(), Arc::new(js_lang));
}
#[cfg(feature = "lang-rhai")]
{
let rhai_lang = camel_language_rhai::RhaiLanguage::new();
languages.insert("rhai".to_string(), Arc::new(rhai_lang));
}
#[cfg(feature = "lang-jsonpath")]
{
languages.insert(
"jsonpath".to_string(),
Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
);
}
#[cfg(feature = "lang-xpath")]
{
languages.insert(
"xpath".to_string(),
Arc::new(camel_language_xpath::XPathLanguage::new()),
);
}
Arc::new(std::sync::Mutex::new(languages))
}
fn build_runtime(
controller: RouteControllerHandle,
store: crate::lifecycle::adapters::InMemoryRuntimeStore,
execution_factory: Option<ExecutionFactory>,
health_registry: Arc<HealthCheckRegistry>,
) -> Arc<RuntimeBus> {
let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
factory(controller.clone())
} else {
Arc::new(RuntimeExecutionAdapter::new(controller))
};
Arc::new(
RuntimeBus::new(
Arc::new(store.clone()),
Arc::new(store.clone()),
Arc::new(store.clone()),
Arc::new(store.clone()),
)
.with_uow(Arc::new(store))
.with_execution(execution)
.with_health_registry(health_registry),
)
}
pub async fn build(self) -> Result<CamelContext, CamelError> {
let registry = self
.registry
.unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
let languages = self.languages.unwrap_or_else(Self::built_in_languages);
let simple_with_resolver: Arc<dyn Language> = Arc::new(
camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
let languages = Arc::clone(&languages);
move |name| {
languages
.lock()
.ok()
.and_then(|registry| registry.get(name).cloned())
}
})),
);
languages
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") .insert("simple".to_string(), simple_with_resolver);
let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
let platform_service = self
.platform_service
.unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
let health_registry = self.health_registry.unwrap_or_else(|| {
Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
});
let (controller, actor_join, supervision_join) =
if let Some(config) = self.supervision_config {
let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
let mut controller_impl = if let Some(ref beans) = self.beans {
DefaultRouteController::with_languages_and_beans(
Arc::clone(®istry),
Arc::clone(&languages),
Arc::clone(&platform_service),
Arc::clone(beans),
)
} else {
DefaultRouteController::with_languages(
Arc::clone(®istry),
Arc::clone(&languages),
Arc::clone(&platform_service),
)
};
if let Some(invoker) = self.function_invoker.clone() {
controller_impl = controller_impl.with_function_invoker(invoker);
}
controller_impl.set_health_registry(Arc::clone(&health_registry));
controller_impl.set_crash_notifier(crash_tx);
let (controller, actor_join) = spawn_controller_actor(controller_impl);
let supervision_join = spawn_supervision_task(
controller.clone(),
config,
Some(Arc::clone(&metrics)),
crash_rx,
);
(controller, actor_join, Some(supervision_join))
} else {
let mut controller_impl = if let Some(ref beans) = self.beans {
DefaultRouteController::with_languages_and_beans(
Arc::clone(®istry),
Arc::clone(&languages),
Arc::clone(&platform_service),
Arc::clone(beans),
)
} else {
DefaultRouteController::with_languages(
Arc::clone(®istry),
Arc::clone(&languages),
Arc::clone(&platform_service),
)
};
if let Some(invoker) = self.function_invoker.clone() {
controller_impl = controller_impl.with_function_invoker(invoker);
}
controller_impl.set_health_registry(Arc::clone(&health_registry));
let (controller, actor_join) = spawn_controller_actor(controller_impl);
(controller, actor_join, None)
};
let store = self.runtime_store.unwrap_or_default();
let runtime = Self::build_runtime(
controller.clone(),
store,
self.execution_factory,
Arc::clone(&health_registry),
);
let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
controller
.try_set_runtime_handle(runtime_handle)
.expect("controller actor mailbox should accept initial runtime handle");
let template_registry = self
.template_registry
.unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
Ok(CamelContext::from_parts(FromParts {
registry,
route_controller: controller,
_actor_join: actor_join,
supervision_join,
runtime,
cancel_token: CancellationToken::new(),
metrics,
platform_service,
languages,
shutdown_timeout: self.shutdown_timeout,
services: self.lifecycle_services,
health_registry,
component_configs: HashMap::new(),
function_invoker: self.function_invoker,
template_registry,
}))
}
}
impl Default for CamelContextBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::FromParts;
#[test]
fn builder_default_has_sane_timeout() {
let builder = CamelContextBuilder::new();
assert_eq!(builder.shutdown_timeout, std::time::Duration::from_secs(5));
}
}