Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::{
10    CamelError, HealthReport, HealthStatus, LeaderElector, Lifecycle, MetricsCollector,
11    NoOpMetrics, NoopLeaderElector, NoopReadinessGate, PlatformIdentity, ReadinessGate,
12    RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
13};
14use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
15use camel_language_api::Language;
16
17use crate::lifecycle::adapters::RuntimeExecutionAdapter;
18use crate::lifecycle::adapters::controller_actor::{
19    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
20};
21use crate::lifecycle::adapters::route_controller::{
22    DefaultRouteController, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::lifecycle::application::runtime_bus::RuntimeBus;
26use crate::lifecycle::domain::LanguageRegistryError;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29
30static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
31
32pub struct CamelContextBuilder {
33    registry: Option<Arc<std::sync::Mutex<Registry>>>,
34    languages: Option<SharedLanguageRegistry>,
35    metrics: Option<Arc<dyn MetricsCollector>>,
36    // Platform ports
37    leader_elector: Option<Arc<dyn LeaderElector>>,
38    readiness_gate: Option<Arc<dyn ReadinessGate>>,
39    platform_identity: Option<PlatformIdentity>,
40    supervision_config: Option<SupervisionConfig>,
41    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
42    shutdown_timeout: std::time::Duration,
43}
44
45/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
46///
47/// # Lifecycle
48///
49/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
50/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
51/// stopped context is not supported — create a new instance instead.
52pub struct CamelContext {
53    registry: Arc<std::sync::Mutex<Registry>>,
54    route_controller: RouteControllerHandle,
55    _actor_join: tokio::task::JoinHandle<()>,
56    supervision_join: Option<tokio::task::JoinHandle<()>>,
57    runtime: Arc<RuntimeBus>,
58    cancel_token: CancellationToken,
59    metrics: Arc<dyn MetricsCollector>,
60    // Platform ports
61    leader_elector: Arc<dyn LeaderElector>,
62    readiness_gate: Arc<dyn ReadinessGate>,
63    platform_identity: PlatformIdentity,
64    languages: SharedLanguageRegistry,
65    shutdown_timeout: std::time::Duration,
66    services: Vec<Box<dyn Lifecycle>>,
67    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68}
69
70/// Opaque handle for runtime side-effect execution operations.
71///
72/// This intentionally does not expose direct lifecycle mutation APIs to callers.
73#[derive(Clone)]
74pub struct RuntimeExecutionHandle {
75    controller: RouteControllerHandle,
76    runtime: Arc<RuntimeBus>,
77}
78
79impl RuntimeExecutionHandle {
80    pub(crate) async fn add_route_definition(
81        &self,
82        definition: RouteDefinition,
83    ) -> Result<(), CamelError> {
84        use crate::lifecycle::ports::RouteRegistrationPort;
85        self.runtime.register_route(definition).await
86    }
87
88    pub(crate) async fn compile_route_definition(
89        &self,
90        definition: RouteDefinition,
91    ) -> Result<camel_api::BoxProcessor, CamelError> {
92        self.controller.compile_route_definition(definition).await
93    }
94
95    pub(crate) async fn swap_route_pipeline(
96        &self,
97        route_id: &str,
98        pipeline: camel_api::BoxProcessor,
99    ) -> Result<(), CamelError> {
100        self.controller.swap_pipeline(route_id, pipeline).await
101    }
102
103    pub(crate) async fn execute_runtime_command(
104        &self,
105        cmd: camel_api::RuntimeCommand,
106    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
107        self.runtime.execute(cmd).await
108    }
109
110    pub(crate) async fn runtime_route_status(
111        &self,
112        route_id: &str,
113    ) -> Result<Option<String>, CamelError> {
114        match self
115            .runtime
116            .ask(camel_api::RuntimeQuery::GetRouteStatus {
117                route_id: route_id.to_string(),
118            })
119            .await
120        {
121            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
122            Ok(_) => Err(CamelError::RouteError(
123                "unexpected runtime query response for route status".to_string(),
124            )),
125            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
126            Err(err) => Err(err),
127        }
128    }
129
130    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
131        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
132            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
133            Ok(_) => Err(CamelError::RouteError(
134                "unexpected runtime query response for route listing".to_string(),
135            )),
136            Err(err) => Err(err),
137        }
138    }
139
140    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
141        self.controller.route_source_hash(route_id).await
142    }
143
144    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
145        if !self.controller.route_exists(route_id).await? {
146            return Err(CamelError::RouteError(format!(
147                "Route '{}' not found",
148                route_id
149            )));
150        }
151        Ok(self
152            .controller
153            .in_flight_count(route_id)
154            .await?
155            .unwrap_or(0))
156    }
157
158    #[cfg(test)]
159    pub(crate) async fn force_start_route_for_test(
160        &self,
161        route_id: &str,
162    ) -> Result<(), CamelError> {
163        self.controller.start_route(route_id).await
164    }
165
166    #[cfg(test)]
167    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
168        self.controller.route_count().await.unwrap_or(0)
169    }
170}
171
172impl CamelContext {
173    fn built_in_languages() -> SharedLanguageRegistry {
174        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
175        languages.insert(
176            "simple".to_string(),
177            Arc::new(camel_language_simple::SimpleLanguage),
178        );
179        #[cfg(feature = "lang-js")]
180        {
181            let js_lang = camel_language_js::JsLanguage::new();
182            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
183            languages.insert("javascript".to_string(), Arc::new(js_lang));
184        }
185        #[cfg(feature = "lang-rhai")]
186        {
187            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
188            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
189        }
190        #[cfg(feature = "lang-jsonpath")]
191        {
192            languages.insert(
193                "jsonpath".to_string(),
194                Arc::new(camel_language_jsonpath::JsonPathLanguage),
195            );
196        }
197        #[cfg(feature = "lang-xpath")]
198        {
199            languages.insert(
200                "xpath".to_string(),
201                Arc::new(camel_language_xpath::XPathLanguage),
202            );
203        }
204        Arc::new(std::sync::Mutex::new(languages))
205    }
206
207    fn build_runtime(
208        controller: RouteControllerHandle,
209        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
210    ) -> Arc<RuntimeBus> {
211        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
212        Arc::new(
213            RuntimeBus::new(
214                Arc::new(store.clone()),
215                Arc::new(store.clone()),
216                Arc::new(store.clone()),
217                Arc::new(store.clone()),
218            )
219            .with_uow(Arc::new(store))
220            .with_execution(execution),
221        )
222    }
223
224    pub fn builder() -> CamelContextBuilder {
225        CamelContextBuilder::new()
226    }
227
228    /// Set a global error handler applied to all routes without a per-route handler.
229    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
230        let _ = self.route_controller.set_error_handler(config).await;
231    }
232
233    /// Enable or disable tracing globally.
234    pub async fn set_tracing(&mut self, enabled: bool) {
235        let _ = self
236            .route_controller
237            .set_tracer_config(TracerConfig {
238                enabled,
239                ..Default::default()
240            })
241            .await;
242    }
243
244    /// Configure tracing with full config.
245    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
246        // Inject metrics collector if not already set
247        let config = if config.metrics_collector.is_none() {
248            TracerConfig {
249                metrics_collector: Some(Arc::clone(&self.metrics)),
250                ..config
251            }
252        } else {
253            config
254        };
255
256        let _ = self.route_controller.set_tracer_config(config).await;
257    }
258
259    /// Builder-style: enable tracing with default config.
260    pub async fn with_tracing(mut self) -> Self {
261        self.set_tracing(true).await;
262        self
263    }
264
265    /// Builder-style: configure tracing with custom config.
266    /// Note: tracing subscriber initialization (stdout/file output) is handled
267    /// separately via init_tracing_subscriber (called in camel-config bridge).
268    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
269        self.set_tracer_config(config).await;
270        self
271    }
272
273    /// Register a lifecycle service (Apache Camel: addService pattern)
274    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
275        // Auto-register MetricsCollector if available
276        if let Some(collector) = service.as_metrics_collector() {
277            self.metrics = collector;
278        }
279
280        self.services.push(Box::new(service));
281        self
282    }
283
284    /// Register a component with this context.
285    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
286        info!(scheme = component.scheme(), "Registering component");
287        self.registry
288            .lock()
289            .expect("mutex poisoned: another thread panicked while holding this lock")
290            .register(Arc::new(component));
291    }
292
293    /// Register a language with this context, keyed by name.
294    ///
295    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
296    /// with the same name is already registered. Use
297    /// [`resolve_language`](Self::resolve_language) to check before
298    /// registering, or choose a distinct name.
299    pub fn register_language(
300        &mut self,
301        name: impl Into<String>,
302        lang: Box<dyn Language>,
303    ) -> Result<(), LanguageRegistryError> {
304        let name = name.into();
305        let mut languages = self
306            .languages
307            .lock()
308            .expect("mutex poisoned: another thread panicked while holding this lock");
309        if languages.contains_key(&name) {
310            return Err(LanguageRegistryError::AlreadyRegistered { name });
311        }
312        languages.insert(name, Arc::from(lang));
313        Ok(())
314    }
315
316    /// Resolve a language by name. Returns `None` if not registered.
317    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
318        let languages = self
319            .languages
320            .lock()
321            .expect("mutex poisoned: another thread panicked while holding this lock");
322        languages.get(name).cloned()
323    }
324
325    /// Add a route definition to this context.
326    ///
327    /// The route must have an ID. Steps are resolved immediately using registered components.
328    pub async fn add_route_definition(
329        &self,
330        definition: RouteDefinition,
331    ) -> Result<(), CamelError> {
332        use crate::lifecycle::ports::RouteRegistrationPort;
333        info!(
334            from = definition.from_uri(),
335            route_id = %definition.route_id(),
336            "Adding route definition"
337        );
338        self.runtime.register_route(definition).await
339    }
340
341    fn next_context_command_id(op: &str, route_id: &str) -> String {
342        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
343        format!("context:{op}:{route_id}:{seq}")
344    }
345
346    /// Access the component registry.
347    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
348        self.registry
349            .lock()
350            .expect("mutex poisoned: another thread panicked while holding this lock")
351    }
352
353    /// Get runtime execution handle for file-watcher integrations.
354    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
355        RuntimeExecutionHandle {
356            controller: self.route_controller.clone(),
357            runtime: Arc::clone(&self.runtime),
358        }
359    }
360
361    /// Get the metrics collector.
362    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
363        Arc::clone(&self.metrics)
364    }
365
366    /// Get the leader elector port.
367    pub fn leader_elector(&self) -> Arc<dyn LeaderElector> {
368        Arc::clone(&self.leader_elector)
369    }
370
371    /// Get the readiness gate port.
372    pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
373        Arc::clone(&self.readiness_gate)
374    }
375
376    /// Get the platform identity.
377    pub fn platform_identity(&self) -> &PlatformIdentity {
378        &self.platform_identity
379    }
380
381    /// Get runtime command/query bus handle.
382    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
383        self.runtime.clone()
384    }
385
386    /// Build a producer context wired to this runtime.
387    pub fn producer_context(&self) -> camel_api::ProducerContext {
388        camel_api::ProducerContext::new().with_runtime(self.runtime())
389    }
390
391    /// Query route status via runtime read-model.
392    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
393        match self
394            .runtime()
395            .ask(camel_api::RuntimeQuery::GetRouteStatus {
396                route_id: route_id.to_string(),
397            })
398            .await
399        {
400            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
401            Ok(_) => Err(CamelError::RouteError(
402                "unexpected runtime query response for route status".to_string(),
403            )),
404            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
405            Err(err) => Err(err),
406        }
407    }
408
409    /// Start all routes. Each route's consumer will begin producing exchanges.
410    ///
411    /// Only routes with `auto_startup == true` will be started, in order of their
412    /// `startup_order` (lower values start first).
413    pub async fn start(&mut self) -> Result<(), CamelError> {
414        info!("Starting CamelContext");
415
416        // Start lifecycle services first
417        for (i, service) in self.services.iter_mut().enumerate() {
418            info!("Starting service: {}", service.name());
419            if let Err(e) = service.start().await {
420                // Rollback: stop already started services in reverse order
421                warn!(
422                    "Service {} failed to start, rolling back {} services",
423                    service.name(),
424                    i
425                );
426                for j in (0..i).rev() {
427                    if let Err(rollback_err) = self.services[j].stop().await {
428                        warn!(
429                            "Failed to stop service {} during rollback: {}",
430                            self.services[j].name(),
431                            rollback_err
432                        );
433                    }
434                }
435                return Err(e);
436            }
437        }
438
439        // Then start routes via runtime command bus (aggregate-first),
440        // preserving route controller startup ordering metadata.
441        let route_ids = self.route_controller.auto_startup_route_ids().await?;
442        for route_id in route_ids {
443            self.runtime
444                .execute(camel_api::RuntimeCommand::StartRoute {
445                    route_id: route_id.clone(),
446                    command_id: Self::next_context_command_id("start", &route_id),
447                    causation_id: None,
448                })
449                .await?;
450        }
451
452        info!("CamelContext started");
453        Ok(())
454    }
455
456    /// Graceful shutdown with default 30-second timeout.
457    pub async fn stop(&mut self) -> Result<(), CamelError> {
458        self.stop_timeout(self.shutdown_timeout).await
459    }
460
461    /// Graceful shutdown with custom timeout.
462    ///
463    /// Note: The timeout parameter is currently not used directly; the RouteController
464    /// manages its own shutdown timeout. This may change in a future version.
465    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
466        info!("Stopping CamelContext");
467
468        // Signal cancellation (for any legacy code that might use it)
469        self.cancel_token.cancel();
470        if let Some(join) = self.supervision_join.take() {
471            join.abort();
472        }
473
474        // Stop all routes via runtime command bus (aggregate-first),
475        // preserving route controller shutdown ordering metadata.
476        let route_ids = self.route_controller.shutdown_route_ids().await?;
477        for route_id in route_ids {
478            if let Err(err) = self
479                .runtime
480                .execute(camel_api::RuntimeCommand::StopRoute {
481                    route_id: route_id.clone(),
482                    command_id: Self::next_context_command_id("stop", &route_id),
483                    causation_id: None,
484                })
485                .await
486            {
487                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
488            }
489        }
490
491        // Then stop lifecycle services in reverse insertion order (LIFO)
492        // Continue stopping all services even if some fail
493        let mut first_error = None;
494        for service in self.services.iter_mut().rev() {
495            info!("Stopping service: {}", service.name());
496            if let Err(e) = service.stop().await {
497                warn!("Service {} failed to stop: {}", service.name(), e);
498                if first_error.is_none() {
499                    first_error = Some(e);
500                }
501            }
502        }
503
504        info!("CamelContext stopped");
505
506        if let Some(e) = first_error {
507            Err(e)
508        } else {
509            Ok(())
510        }
511    }
512
513    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
514    pub fn shutdown_timeout(&self) -> std::time::Duration {
515        self.shutdown_timeout
516    }
517
518    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
519    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
520        self.shutdown_timeout = timeout;
521    }
522
523    /// Immediate abort — kills all tasks without draining.
524    pub async fn abort(&mut self) {
525        self.cancel_token.cancel();
526        if let Some(join) = self.supervision_join.take() {
527            join.abort();
528        }
529        let route_ids = self
530            .route_controller
531            .shutdown_route_ids()
532            .await
533            .unwrap_or_default();
534        for route_id in route_ids {
535            let _ = self
536                .runtime
537                .execute(camel_api::RuntimeCommand::StopRoute {
538                    route_id: route_id.clone(),
539                    command_id: Self::next_context_command_id("abort-stop", &route_id),
540                    causation_id: None,
541                })
542                .await;
543        }
544    }
545
546    /// Check health status of all registered services.
547    pub fn health_check(&self) -> HealthReport {
548        let services: Vec<ServiceHealth> = self
549            .services
550            .iter()
551            .map(|s| ServiceHealth {
552                name: s.name().to_string(),
553                status: s.status(),
554            })
555            .collect();
556
557        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
558            HealthStatus::Healthy
559        } else {
560            HealthStatus::Unhealthy
561        };
562
563        HealthReport {
564            status,
565            services,
566            ..Default::default()
567        }
568    }
569
570    /// Store a component config. Overwrites any previously stored config of the same type.
571    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
572        self.component_configs
573            .insert(TypeId::of::<T>(), Box::new(config));
574    }
575
576    /// Retrieve a stored component config by type. Returns None if not stored.
577    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
578        self.component_configs
579            .get(&TypeId::of::<T>())
580            .and_then(|b| b.downcast_ref::<T>())
581    }
582}
583
584impl ComponentRegistrar for CamelContext {
585    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
586        self.registry
587            .lock()
588            .expect("mutex poisoned: another thread panicked while holding this lock")
589            .register(component);
590    }
591}
592
593impl ComponentContext for CamelContext {
594    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
595        self.registry.lock().ok()?.get(scheme)
596    }
597
598    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
599        self.languages.lock().ok()?.get(name).cloned()
600    }
601
602    fn metrics(&self) -> Arc<dyn MetricsCollector> {
603        Arc::clone(&self.metrics)
604    }
605
606    fn leader_elector(&self) -> Arc<dyn LeaderElector> {
607        Arc::clone(&self.leader_elector)
608    }
609}
610
611impl CamelContextBuilder {
612    pub fn new() -> Self {
613        Self {
614            registry: None,
615            languages: None,
616            metrics: None,
617            leader_elector: None,
618            readiness_gate: None,
619            platform_identity: None,
620            supervision_config: None,
621            runtime_store: None,
622            shutdown_timeout: std::time::Duration::from_secs(30),
623        }
624    }
625
626    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
627        self.registry = Some(registry);
628        self
629    }
630
631    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
632        self.languages = Some(languages);
633        self
634    }
635
636    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
637        self.metrics = Some(metrics);
638        self
639    }
640
641    /// Set a custom leader elector port.
642    pub fn leader_elector(mut self, elector: Arc<dyn LeaderElector>) -> Self {
643        self.leader_elector = Some(elector);
644        self
645    }
646
647    /// Set a custom readiness gate port.
648    pub fn readiness_gate(mut self, gate: Arc<dyn ReadinessGate>) -> Self {
649        self.readiness_gate = Some(gate);
650        self
651    }
652
653    /// Set a custom platform identity.
654    pub fn platform_identity(mut self, identity: PlatformIdentity) -> Self {
655        self.platform_identity = Some(identity);
656        self
657    }
658
659    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
660        self.supervision_config = Some(config);
661        self
662    }
663
664    pub fn runtime_store(
665        mut self,
666        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
667    ) -> Self {
668        self.runtime_store = Some(store);
669        self
670    }
671
672    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
673        self.shutdown_timeout = timeout;
674        self
675    }
676
677    pub async fn build(self) -> Result<CamelContext, CamelError> {
678        let registry = self
679            .registry
680            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
681        let languages = self
682            .languages
683            .unwrap_or_else(CamelContext::built_in_languages);
684        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
685        let leader_elector = self
686            .leader_elector
687            .unwrap_or_else(|| Arc::new(NoopLeaderElector));
688        let readiness_gate = self
689            .readiness_gate
690            .unwrap_or_else(|| Arc::new(NoopReadinessGate));
691        let platform_identity = self
692            .platform_identity
693            .unwrap_or_else(|| PlatformIdentity::local("local"));
694
695        let (controller, actor_join, supervision_join) =
696            if let Some(config) = self.supervision_config {
697                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
698                let mut controller_impl = DefaultRouteController::with_languages(
699                    Arc::clone(&registry),
700                    Arc::clone(&languages),
701                    Arc::clone(&leader_elector),
702                );
703                controller_impl.set_crash_notifier(crash_tx);
704                let (controller, actor_join) = spawn_controller_actor(controller_impl);
705                let supervision_join = spawn_supervision_task(
706                    controller.clone(),
707                    config,
708                    Some(Arc::clone(&metrics)),
709                    crash_rx,
710                );
711                (controller, actor_join, Some(supervision_join))
712            } else {
713                let controller_impl = DefaultRouteController::with_languages(
714                    Arc::clone(&registry),
715                    Arc::clone(&languages),
716                    Arc::clone(&leader_elector),
717                );
718                let (controller, actor_join) = spawn_controller_actor(controller_impl);
719                (controller, actor_join, None)
720            };
721
722        let store = self.runtime_store.unwrap_or_default();
723        let runtime = CamelContext::build_runtime(controller.clone(), store);
724        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
725        controller
726            .try_set_runtime_handle(runtime_handle)
727            .expect("controller actor mailbox should accept initial runtime handle");
728
729        Ok(CamelContext {
730            registry,
731            route_controller: controller,
732            _actor_join: actor_join,
733            supervision_join,
734            runtime,
735            cancel_token: CancellationToken::new(),
736            metrics,
737            leader_elector,
738            readiness_gate,
739            platform_identity,
740            languages,
741            shutdown_timeout: self.shutdown_timeout,
742            services: Vec::new(),
743            component_configs: HashMap::new(),
744        })
745    }
746}
747
748impl Default for CamelContextBuilder {
749    fn default() -> Self {
750        Self::new()
751    }
752}
753
754#[cfg(test)]
755#[path = "context_tests.rs"]
756mod context_tests;