Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::{
10    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
11    RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
12};
13use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
14use camel_language_api::Language;
15
16use crate::lifecycle::adapters::RuntimeExecutionAdapter;
17use crate::lifecycle::adapters::controller_actor::{
18    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
19};
20use crate::lifecycle::adapters::route_controller::{
21    DefaultRouteController, SharedLanguageRegistry,
22};
23use crate::lifecycle::application::route_definition::RouteDefinition;
24use crate::lifecycle::application::runtime_bus::RuntimeBus;
25use crate::lifecycle::domain::LanguageRegistryError;
26use crate::shared::components::domain::Registry;
27use crate::shared::observability::domain::TracerConfig;
28
29static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
30
31pub struct CamelContextBuilder {
32    registry: Option<Arc<std::sync::Mutex<Registry>>>,
33    languages: Option<SharedLanguageRegistry>,
34    metrics: Option<Arc<dyn MetricsCollector>>,
35    supervision_config: Option<SupervisionConfig>,
36    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
37    shutdown_timeout: std::time::Duration,
38}
39
40/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
41///
42/// # Lifecycle
43///
44/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
45/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
46/// stopped context is not supported — create a new instance instead.
47pub struct CamelContext {
48    registry: Arc<std::sync::Mutex<Registry>>,
49    route_controller: RouteControllerHandle,
50    _actor_join: tokio::task::JoinHandle<()>,
51    supervision_join: Option<tokio::task::JoinHandle<()>>,
52    runtime: Arc<RuntimeBus>,
53    cancel_token: CancellationToken,
54    metrics: Arc<dyn MetricsCollector>,
55    languages: SharedLanguageRegistry,
56    shutdown_timeout: std::time::Duration,
57    services: Vec<Box<dyn Lifecycle>>,
58    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
59}
60
61/// Opaque handle for runtime side-effect execution operations.
62///
63/// This intentionally does not expose direct lifecycle mutation APIs to callers.
64#[derive(Clone)]
65pub struct RuntimeExecutionHandle {
66    controller: RouteControllerHandle,
67    runtime: Arc<RuntimeBus>,
68}
69
70impl RuntimeExecutionHandle {
71    pub(crate) async fn add_route_definition(
72        &self,
73        definition: RouteDefinition,
74    ) -> Result<(), CamelError> {
75        use crate::lifecycle::ports::RouteRegistrationPort;
76        self.runtime.register_route(definition).await
77    }
78
79    pub(crate) async fn compile_route_definition(
80        &self,
81        definition: RouteDefinition,
82    ) -> Result<camel_api::BoxProcessor, CamelError> {
83        self.controller.compile_route_definition(definition).await
84    }
85
86    pub(crate) async fn swap_route_pipeline(
87        &self,
88        route_id: &str,
89        pipeline: camel_api::BoxProcessor,
90    ) -> Result<(), CamelError> {
91        self.controller.swap_pipeline(route_id, pipeline).await
92    }
93
94    pub(crate) async fn execute_runtime_command(
95        &self,
96        cmd: camel_api::RuntimeCommand,
97    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
98        self.runtime.execute(cmd).await
99    }
100
101    pub(crate) async fn runtime_route_status(
102        &self,
103        route_id: &str,
104    ) -> Result<Option<String>, CamelError> {
105        match self
106            .runtime
107            .ask(camel_api::RuntimeQuery::GetRouteStatus {
108                route_id: route_id.to_string(),
109            })
110            .await
111        {
112            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
113            Ok(_) => Err(CamelError::RouteError(
114                "unexpected runtime query response for route status".to_string(),
115            )),
116            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
117            Err(err) => Err(err),
118        }
119    }
120
121    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
122        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
123            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
124            Ok(_) => Err(CamelError::RouteError(
125                "unexpected runtime query response for route listing".to_string(),
126            )),
127            Err(err) => Err(err),
128        }
129    }
130
131    pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
132        self.controller.route_source_hash(route_id).await
133    }
134
135    pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
136        if !self.controller.route_exists(route_id).await? {
137            return Err(CamelError::RouteError(format!(
138                "Route '{}' not found",
139                route_id
140            )));
141        }
142        Ok(self
143            .controller
144            .in_flight_count(route_id)
145            .await?
146            .unwrap_or(0))
147    }
148
149    #[cfg(test)]
150    pub(crate) async fn force_start_route_for_test(
151        &self,
152        route_id: &str,
153    ) -> Result<(), CamelError> {
154        self.controller.start_route(route_id).await
155    }
156
157    #[cfg(test)]
158    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
159        self.controller.route_count().await.unwrap_or(0)
160    }
161}
162
163impl CamelContext {
164    fn built_in_languages() -> SharedLanguageRegistry {
165        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
166        languages.insert(
167            "simple".to_string(),
168            Arc::new(camel_language_simple::SimpleLanguage),
169        );
170        #[cfg(feature = "lang-js")]
171        {
172            let js_lang = camel_language_js::JsLanguage::new();
173            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
174            languages.insert("javascript".to_string(), Arc::new(js_lang));
175        }
176        #[cfg(feature = "lang-rhai")]
177        {
178            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
179            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
180        }
181        #[cfg(feature = "lang-jsonpath")]
182        {
183            languages.insert(
184                "jsonpath".to_string(),
185                Arc::new(camel_language_jsonpath::JsonPathLanguage),
186            );
187        }
188        #[cfg(feature = "lang-xpath")]
189        {
190            languages.insert(
191                "xpath".to_string(),
192                Arc::new(camel_language_xpath::XPathLanguage),
193            );
194        }
195        Arc::new(std::sync::Mutex::new(languages))
196    }
197
198    fn build_runtime(
199        controller: RouteControllerHandle,
200        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
201    ) -> Arc<RuntimeBus> {
202        let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
203        Arc::new(
204            RuntimeBus::new(
205                Arc::new(store.clone()),
206                Arc::new(store.clone()),
207                Arc::new(store.clone()),
208                Arc::new(store.clone()),
209            )
210            .with_uow(Arc::new(store))
211            .with_execution(execution),
212        )
213    }
214
215    pub fn builder() -> CamelContextBuilder {
216        CamelContextBuilder::new()
217    }
218
219    /// Set a global error handler applied to all routes without a per-route handler.
220    pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
221        let _ = self.route_controller.set_error_handler(config).await;
222    }
223
224    /// Enable or disable tracing globally.
225    pub async fn set_tracing(&mut self, enabled: bool) {
226        let _ = self
227            .route_controller
228            .set_tracer_config(TracerConfig {
229                enabled,
230                ..Default::default()
231            })
232            .await;
233    }
234
235    /// Configure tracing with full config.
236    pub async fn set_tracer_config(&mut self, config: TracerConfig) {
237        // Inject metrics collector if not already set
238        let config = if config.metrics_collector.is_none() {
239            TracerConfig {
240                metrics_collector: Some(Arc::clone(&self.metrics)),
241                ..config
242            }
243        } else {
244            config
245        };
246
247        let _ = self.route_controller.set_tracer_config(config).await;
248    }
249
250    /// Builder-style: enable tracing with default config.
251    pub async fn with_tracing(mut self) -> Self {
252        self.set_tracing(true).await;
253        self
254    }
255
256    /// Builder-style: configure tracing with custom config.
257    /// Note: tracing subscriber initialization (stdout/file output) is handled
258    /// separately via init_tracing_subscriber (called in camel-config bridge).
259    pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
260        self.set_tracer_config(config).await;
261        self
262    }
263
264    /// Register a lifecycle service (Apache Camel: addService pattern)
265    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
266        // Auto-register MetricsCollector if available
267        if let Some(collector) = service.as_metrics_collector() {
268            self.metrics = collector;
269        }
270
271        self.services.push(Box::new(service));
272        self
273    }
274
275    /// Register a component with this context.
276    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
277        info!(scheme = component.scheme(), "Registering component");
278        self.registry
279            .lock()
280            .expect("mutex poisoned: another thread panicked while holding this lock")
281            .register(Arc::new(component));
282    }
283
284    /// Register a language with this context, keyed by name.
285    ///
286    /// Returns `Err(LanguageRegistryError::AlreadyRegistered)` if a language
287    /// with the same name is already registered. Use
288    /// [`resolve_language`](Self::resolve_language) to check before
289    /// registering, or choose a distinct name.
290    pub fn register_language(
291        &mut self,
292        name: impl Into<String>,
293        lang: Box<dyn Language>,
294    ) -> Result<(), LanguageRegistryError> {
295        let name = name.into();
296        let mut languages = self
297            .languages
298            .lock()
299            .expect("mutex poisoned: another thread panicked while holding this lock");
300        if languages.contains_key(&name) {
301            return Err(LanguageRegistryError::AlreadyRegistered { name });
302        }
303        languages.insert(name, Arc::from(lang));
304        Ok(())
305    }
306
307    /// Resolve a language by name. Returns `None` if not registered.
308    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
309        let languages = self
310            .languages
311            .lock()
312            .expect("mutex poisoned: another thread panicked while holding this lock");
313        languages.get(name).cloned()
314    }
315
316    /// Add a route definition to this context.
317    ///
318    /// The route must have an ID. Steps are resolved immediately using registered components.
319    pub async fn add_route_definition(
320        &self,
321        definition: RouteDefinition,
322    ) -> Result<(), CamelError> {
323        use crate::lifecycle::ports::RouteRegistrationPort;
324        info!(
325            from = definition.from_uri(),
326            route_id = %definition.route_id(),
327            "Adding route definition"
328        );
329        self.runtime.register_route(definition).await
330    }
331
332    fn next_context_command_id(op: &str, route_id: &str) -> String {
333        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
334        format!("context:{op}:{route_id}:{seq}")
335    }
336
337    /// Access the component registry.
338    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
339        self.registry
340            .lock()
341            .expect("mutex poisoned: another thread panicked while holding this lock")
342    }
343
344    /// Get runtime execution handle for file-watcher integrations.
345    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
346        RuntimeExecutionHandle {
347            controller: self.route_controller.clone(),
348            runtime: Arc::clone(&self.runtime),
349        }
350    }
351
352    /// Get the metrics collector.
353    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
354        Arc::clone(&self.metrics)
355    }
356
357    /// Get runtime command/query bus handle.
358    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
359        self.runtime.clone()
360    }
361
362    /// Build a producer context wired to this runtime.
363    pub fn producer_context(&self) -> camel_api::ProducerContext {
364        camel_api::ProducerContext::new().with_runtime(self.runtime())
365    }
366
367    /// Query route status via runtime read-model.
368    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
369        match self
370            .runtime()
371            .ask(camel_api::RuntimeQuery::GetRouteStatus {
372                route_id: route_id.to_string(),
373            })
374            .await
375        {
376            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
377            Ok(_) => Err(CamelError::RouteError(
378                "unexpected runtime query response for route status".to_string(),
379            )),
380            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
381            Err(err) => Err(err),
382        }
383    }
384
385    /// Start all routes. Each route's consumer will begin producing exchanges.
386    ///
387    /// Only routes with `auto_startup == true` will be started, in order of their
388    /// `startup_order` (lower values start first).
389    pub async fn start(&mut self) -> Result<(), CamelError> {
390        info!("Starting CamelContext");
391
392        // Start lifecycle services first
393        for (i, service) in self.services.iter_mut().enumerate() {
394            info!("Starting service: {}", service.name());
395            if let Err(e) = service.start().await {
396                // Rollback: stop already started services in reverse order
397                warn!(
398                    "Service {} failed to start, rolling back {} services",
399                    service.name(),
400                    i
401                );
402                for j in (0..i).rev() {
403                    if let Err(rollback_err) = self.services[j].stop().await {
404                        warn!(
405                            "Failed to stop service {} during rollback: {}",
406                            self.services[j].name(),
407                            rollback_err
408                        );
409                    }
410                }
411                return Err(e);
412            }
413        }
414
415        // Then start routes via runtime command bus (aggregate-first),
416        // preserving route controller startup ordering metadata.
417        let route_ids = self.route_controller.auto_startup_route_ids().await?;
418        for route_id in route_ids {
419            self.runtime
420                .execute(camel_api::RuntimeCommand::StartRoute {
421                    route_id: route_id.clone(),
422                    command_id: Self::next_context_command_id("start", &route_id),
423                    causation_id: None,
424                })
425                .await?;
426        }
427
428        info!("CamelContext started");
429        Ok(())
430    }
431
432    /// Graceful shutdown with default 30-second timeout.
433    pub async fn stop(&mut self) -> Result<(), CamelError> {
434        self.stop_timeout(self.shutdown_timeout).await
435    }
436
437    /// Graceful shutdown with custom timeout.
438    ///
439    /// Note: The timeout parameter is currently not used directly; the RouteController
440    /// manages its own shutdown timeout. This may change in a future version.
441    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
442        info!("Stopping CamelContext");
443
444        // Signal cancellation (for any legacy code that might use it)
445        self.cancel_token.cancel();
446        if let Some(join) = self.supervision_join.take() {
447            join.abort();
448        }
449
450        // Stop all routes via runtime command bus (aggregate-first),
451        // preserving route controller shutdown ordering metadata.
452        let route_ids = self.route_controller.shutdown_route_ids().await?;
453        for route_id in route_ids {
454            if let Err(err) = self
455                .runtime
456                .execute(camel_api::RuntimeCommand::StopRoute {
457                    route_id: route_id.clone(),
458                    command_id: Self::next_context_command_id("stop", &route_id),
459                    causation_id: None,
460                })
461                .await
462            {
463                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
464            }
465        }
466
467        // Then stop lifecycle services in reverse insertion order (LIFO)
468        // Continue stopping all services even if some fail
469        let mut first_error = None;
470        for service in self.services.iter_mut().rev() {
471            info!("Stopping service: {}", service.name());
472            if let Err(e) = service.stop().await {
473                warn!("Service {} failed to stop: {}", service.name(), e);
474                if first_error.is_none() {
475                    first_error = Some(e);
476                }
477            }
478        }
479
480        info!("CamelContext stopped");
481
482        if let Some(e) = first_error {
483            Err(e)
484        } else {
485            Ok(())
486        }
487    }
488
489    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
490    pub fn shutdown_timeout(&self) -> std::time::Duration {
491        self.shutdown_timeout
492    }
493
494    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
495    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
496        self.shutdown_timeout = timeout;
497    }
498
499    /// Immediate abort — kills all tasks without draining.
500    pub async fn abort(&mut self) {
501        self.cancel_token.cancel();
502        if let Some(join) = self.supervision_join.take() {
503            join.abort();
504        }
505        let route_ids = self
506            .route_controller
507            .shutdown_route_ids()
508            .await
509            .unwrap_or_default();
510        for route_id in route_ids {
511            let _ = self
512                .runtime
513                .execute(camel_api::RuntimeCommand::StopRoute {
514                    route_id: route_id.clone(),
515                    command_id: Self::next_context_command_id("abort-stop", &route_id),
516                    causation_id: None,
517                })
518                .await;
519        }
520    }
521
522    /// Check health status of all registered services.
523    pub fn health_check(&self) -> HealthReport {
524        let services: Vec<ServiceHealth> = self
525            .services
526            .iter()
527            .map(|s| ServiceHealth {
528                name: s.name().to_string(),
529                status: s.status(),
530            })
531            .collect();
532
533        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
534            HealthStatus::Healthy
535        } else {
536            HealthStatus::Unhealthy
537        };
538
539        HealthReport {
540            status,
541            services,
542            ..Default::default()
543        }
544    }
545
546    /// Store a component config. Overwrites any previously stored config of the same type.
547    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
548        self.component_configs
549            .insert(TypeId::of::<T>(), Box::new(config));
550    }
551
552    /// Retrieve a stored component config by type. Returns None if not stored.
553    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
554        self.component_configs
555            .get(&TypeId::of::<T>())
556            .and_then(|b| b.downcast_ref::<T>())
557    }
558}
559
560impl ComponentRegistrar for CamelContext {
561    fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
562        self.registry
563            .lock()
564            .expect("mutex poisoned: another thread panicked while holding this lock")
565            .register(component);
566    }
567}
568
569impl ComponentContext for CamelContext {
570    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
571        self.registry.lock().ok()?.get(scheme)
572    }
573
574    fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
575        self.languages.lock().ok()?.get(name).cloned()
576    }
577
578    fn metrics(&self) -> Arc<dyn MetricsCollector> {
579        Arc::clone(&self.metrics)
580    }
581}
582
583impl CamelContextBuilder {
584    pub fn new() -> Self {
585        Self {
586            registry: None,
587            languages: None,
588            metrics: None,
589            supervision_config: None,
590            runtime_store: None,
591            shutdown_timeout: std::time::Duration::from_secs(30),
592        }
593    }
594
595    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
596        self.registry = Some(registry);
597        self
598    }
599
600    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
601        self.languages = Some(languages);
602        self
603    }
604
605    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
606        self.metrics = Some(metrics);
607        self
608    }
609
610    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
611        self.supervision_config = Some(config);
612        self
613    }
614
615    pub fn runtime_store(
616        mut self,
617        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
618    ) -> Self {
619        self.runtime_store = Some(store);
620        self
621    }
622
623    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
624        self.shutdown_timeout = timeout;
625        self
626    }
627
628    pub async fn build(self) -> Result<CamelContext, CamelError> {
629        let registry = self
630            .registry
631            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
632        let languages = self
633            .languages
634            .unwrap_or_else(CamelContext::built_in_languages);
635        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
636
637        let (controller, actor_join, supervision_join) =
638            if let Some(config) = self.supervision_config {
639                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
640                let mut controller_impl = DefaultRouteController::with_languages(
641                    Arc::clone(&registry),
642                    Arc::clone(&languages),
643                );
644                controller_impl.set_crash_notifier(crash_tx);
645                let (controller, actor_join) = spawn_controller_actor(controller_impl);
646                let supervision_join = spawn_supervision_task(
647                    controller.clone(),
648                    config,
649                    Some(Arc::clone(&metrics)),
650                    crash_rx,
651                );
652                (controller, actor_join, Some(supervision_join))
653            } else {
654                let controller_impl = DefaultRouteController::with_languages(
655                    Arc::clone(&registry),
656                    Arc::clone(&languages),
657                );
658                let (controller, actor_join) = spawn_controller_actor(controller_impl);
659                (controller, actor_join, None)
660            };
661
662        let store = self.runtime_store.unwrap_or_default();
663        let runtime = CamelContext::build_runtime(controller.clone(), store);
664        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
665        controller
666            .try_set_runtime_handle(runtime_handle)
667            .expect("controller actor mailbox should accept initial runtime handle");
668
669        Ok(CamelContext {
670            registry,
671            route_controller: controller,
672            _actor_join: actor_join,
673            supervision_join,
674            runtime,
675            cancel_token: CancellationToken::new(),
676            metrics,
677            languages,
678            shutdown_timeout: self.shutdown_timeout,
679            services: Vec::new(),
680            component_configs: HashMap::new(),
681        })
682    }
683}
684
685impl Default for CamelContextBuilder {
686    fn default() -> Self {
687        Self::new()
688    }
689}
690
691#[cfg(test)]
692#[path = "context_tests.rs"]
693mod context_tests;