Skip to main content

camel_core/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{
9    CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus, SupervisionConfig,
10};
11use camel_component::Component;
12use camel_language_api::Language;
13use camel_language_api::LanguageError;
14
15use crate::config::TracerConfig;
16use crate::registry::Registry;
17use crate::route::RouteDefinition;
18use crate::route_controller::{
19    DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
20};
21use crate::supervising_route_controller::SupervisingRouteController;
22
23/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
24///
25/// # Lifecycle
26///
27/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
28/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
29/// stopped context is not supported — create a new instance instead.
30pub struct CamelContext {
31    registry: Arc<std::sync::Mutex<Registry>>,
32    route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
33    cancel_token: CancellationToken,
34    metrics: Arc<dyn MetricsCollector>,
35    languages: SharedLanguageRegistry,
36    shutdown_timeout: std::time::Duration,
37}
38
39impl CamelContext {
40    fn built_in_languages() -> SharedLanguageRegistry {
41        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
42        languages.insert(
43            "simple".to_string(),
44            Arc::new(camel_language_simple::SimpleLanguage),
45        );
46        Arc::new(std::sync::Mutex::new(languages))
47    }
48
49    /// Create a new, empty CamelContext.
50    pub fn new() -> Self {
51        Self::with_metrics(Arc::new(NoOpMetrics))
52    }
53
54    /// Create a new CamelContext with a custom metrics collector.
55    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
56        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
57        let languages = Self::built_in_languages();
58        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
59            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
60        ));
61
62        // Set self-ref so DefaultRouteController can create ProducerContext
63        // Use try_lock since we just created it and nobody else has access yet
64        controller
65            .try_lock()
66            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
67            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
68
69        Self {
70            registry,
71            route_controller: controller,
72            cancel_token: CancellationToken::new(),
73            metrics,
74            languages,
75            shutdown_timeout: std::time::Duration::from_secs(30),
76        }
77    }
78
79    /// Create a new CamelContext with route supervision enabled.
80    ///
81    /// The supervision config controls automatic restart behavior for crashed routes.
82    pub fn with_supervision(config: SupervisionConfig) -> Self {
83        Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
84    }
85
86    /// Create a new CamelContext with route supervision and custom metrics.
87    ///
88    /// The supervision config controls automatic restart behavior for crashed routes.
89    pub fn with_supervision_and_metrics(
90        config: SupervisionConfig,
91        metrics: Arc<dyn MetricsCollector>,
92    ) -> Self {
93        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
94        let languages = Self::built_in_languages();
95        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
96            SupervisingRouteController::with_languages(
97                Arc::clone(&registry),
98                config,
99                Arc::clone(&languages),
100            )
101            .with_metrics(Arc::clone(&metrics)),
102        ));
103
104        // Set self-ref so SupervisingRouteController can create ProducerContext
105        // Use try_lock since we just created it and nobody else has access yet
106        controller
107            .try_lock()
108            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
109            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
110
111        Self {
112            registry,
113            route_controller: controller,
114            cancel_token: CancellationToken::new(),
115            metrics,
116            languages,
117            shutdown_timeout: std::time::Duration::from_secs(30),
118        }
119    }
120
121    /// Set a global error handler applied to all routes without a per-route handler.
122    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
123        self.route_controller
124            .try_lock()
125            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
126            .set_error_handler(config);
127    }
128
129    /// Enable or disable tracing globally.
130    pub fn set_tracing(&mut self, enabled: bool) {
131        self.route_controller
132            .try_lock()
133            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
134            .set_tracer_config(&TracerConfig {
135                enabled,
136                ..Default::default()
137            });
138    }
139
140    /// Configure tracing with full config.
141    pub fn set_tracer_config(&mut self, config: TracerConfig) {
142        self.route_controller
143            .try_lock()
144            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
145            .set_tracer_config(&config);
146    }
147
148    /// Builder-style: enable tracing with default config.
149    pub fn with_tracing(mut self) -> Self {
150        self.set_tracing(true);
151        self
152    }
153
154    /// Builder-style: configure tracing with custom config.
155    /// Note: tracing subscriber initialization (stdout/file output) is handled
156    /// separately via init_tracing_subscriber (called in camel-config bridge).
157    pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
158        self.set_tracer_config(config);
159        self
160    }
161
162    /// Register a component with this context.
163    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
164        info!(scheme = component.scheme(), "Registering component");
165        self.registry
166            .lock()
167            .expect("mutex poisoned: another thread panicked while holding this lock")
168            .register(component);
169    }
170
171    /// Register a language with this context, keyed by name.
172    ///
173    /// Returns `Err(LanguageError::AlreadyRegistered)` if a language with the
174    /// same name is already registered. Use [`resolve_language`](Self::resolve_language)
175    /// to check before registering, or choose a distinct name.
176    pub fn register_language(
177        &mut self,
178        name: impl Into<String>,
179        lang: Box<dyn Language>,
180    ) -> Result<(), LanguageError> {
181        let name = name.into();
182        let mut languages = self
183            .languages
184            .lock()
185            .expect("mutex poisoned: another thread panicked while holding this lock");
186        if languages.contains_key(&name) {
187            return Err(LanguageError::AlreadyRegistered(name));
188        }
189        languages.insert(name, Arc::from(lang));
190        Ok(())
191    }
192
193    /// Resolve a language by name. Returns `None` if not registered.
194    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
195        let languages = self
196            .languages
197            .lock()
198            .expect("mutex poisoned: another thread panicked while holding this lock");
199        languages.get(name).cloned()
200    }
201
202    /// Add a route definition to this context.
203    ///
204    /// The route must have an ID. Steps are resolved immediately using registered components.
205    pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
206        info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
207
208        self.route_controller
209            .try_lock()
210            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
211            .add_route(definition)
212    }
213
214    /// Access the component registry.
215    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
216        self.registry
217            .lock()
218            .expect("mutex poisoned: another thread panicked while holding this lock")
219    }
220
221    /// Access the route controller.
222    pub fn route_controller(&self) -> &Arc<Mutex<dyn RouteControllerInternal>> {
223        &self.route_controller
224    }
225
226    /// Get the metrics collector.
227    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
228        Arc::clone(&self.metrics)
229    }
230
231    /// Get the status of a route by ID.
232    pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
233        self.route_controller
234            .try_lock()
235            .ok()?
236            .route_status(route_id)
237    }
238
239    /// Start all routes. Each route's consumer will begin producing exchanges.
240    ///
241    /// Only routes with `auto_startup == true` will be started, in order of their
242    /// `startup_order` (lower values start first).
243    pub async fn start(&mut self) -> Result<(), CamelError> {
244        info!("Starting CamelContext");
245
246        self.route_controller
247            .lock()
248            .await
249            .start_all_routes()
250            .await?;
251        info!("CamelContext started");
252        Ok(())
253    }
254
255    /// Graceful shutdown with default 30-second timeout.
256    pub async fn stop(&mut self) -> Result<(), CamelError> {
257        self.stop_timeout(self.shutdown_timeout).await
258    }
259
260    /// Graceful shutdown with custom timeout.
261    ///
262    /// Note: The timeout parameter is currently not used directly; the RouteController
263    /// manages its own shutdown timeout. This may change in a future version.
264    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
265        info!("Stopping CamelContext");
266
267        // Signal cancellation (for any legacy code that might use it)
268        self.cancel_token.cancel();
269
270        // Stop all routes via the controller
271        self.route_controller.lock().await.stop_all_routes().await?;
272
273        info!("CamelContext stopped");
274        Ok(())
275    }
276
277    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
278    pub fn shutdown_timeout(&self) -> std::time::Duration {
279        self.shutdown_timeout
280    }
281
282    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
283    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
284        self.shutdown_timeout = timeout;
285    }
286
287    /// Immediate abort — kills all tasks without draining.
288    pub async fn abort(&mut self) {
289        self.cancel_token.cancel();
290        let _ = self.route_controller.lock().await.stop_all_routes().await;
291    }
292}
293
294impl Default for CamelContext {
295    fn default() -> Self {
296        Self::new()
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::route::{BuilderStep, LanguageExpressionDef, RouteDefinition};
304    use camel_api::CamelError;
305    use camel_component::Endpoint;
306
307    /// Mock component for testing
308    struct MockComponent;
309
310    impl Component for MockComponent {
311        fn scheme(&self) -> &str {
312            "mock"
313        }
314
315        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
316            Err(CamelError::ComponentNotFound("mock".to_string()))
317        }
318    }
319
320    #[test]
321    fn test_context_handles_mutex_poisoning_gracefully() {
322        let mut ctx = CamelContext::new();
323
324        // Register a component successfully
325        ctx.register_component(MockComponent);
326
327        // Access registry should work even after potential panic in another thread
328        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
329            let _guard = ctx.registry();
330        }));
331
332        assert!(
333            result.is_ok(),
334            "Registry access should handle mutex poisoning"
335        );
336    }
337
338    #[test]
339    fn test_context_resolves_simple_language() {
340        let ctx = CamelContext::new();
341        let lang = ctx
342            .resolve_language("simple")
343            .expect("simple language not found");
344        assert_eq!(lang.name(), "simple");
345    }
346
347    #[test]
348    fn test_simple_language_via_context() {
349        let ctx = CamelContext::new();
350        let lang = ctx.resolve_language("simple").unwrap();
351        let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
352        let mut msg = camel_api::message::Message::default();
353        msg.set_header("x", camel_api::Value::String("hello".into()));
354        let ex = camel_api::exchange::Exchange::new(msg);
355        assert!(pred.matches(&ex).unwrap());
356    }
357
358    #[test]
359    fn test_resolve_unknown_language_returns_none() {
360        let ctx = CamelContext::new();
361        assert!(ctx.resolve_language("nonexistent").is_none());
362    }
363
364    #[test]
365    fn test_register_language_duplicate_returns_error() {
366        use camel_language_api::LanguageError;
367        struct DummyLang;
368        impl camel_language_api::Language for DummyLang {
369            fn name(&self) -> &'static str {
370                "dummy"
371            }
372            fn create_expression(
373                &self,
374                _: &str,
375            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
376                Err(LanguageError::EvalError("not implemented".into()))
377            }
378            fn create_predicate(
379                &self,
380                _: &str,
381            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
382                Err(LanguageError::EvalError("not implemented".into()))
383            }
384        }
385
386        let mut ctx = CamelContext::new();
387        ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
388        let result = ctx.register_language("dummy", Box::new(DummyLang));
389        assert!(result.is_err(), "duplicate registration should fail");
390        let err_msg = result.unwrap_err().to_string();
391        assert!(
392            err_msg.contains("dummy"),
393            "error should mention the language name"
394        );
395    }
396
397    #[test]
398    fn test_register_language_new_key_succeeds() {
399        use camel_language_api::LanguageError;
400        struct DummyLang;
401        impl camel_language_api::Language for DummyLang {
402            fn name(&self) -> &'static str {
403                "dummy"
404            }
405            fn create_expression(
406                &self,
407                _: &str,
408            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
409                Err(LanguageError::EvalError("not implemented".into()))
410            }
411            fn create_predicate(
412                &self,
413                _: &str,
414            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
415                Err(LanguageError::EvalError("not implemented".into()))
416            }
417        }
418
419        let mut ctx = CamelContext::new();
420        let result = ctx.register_language("dummy", Box::new(DummyLang));
421        assert!(result.is_ok(), "first registration should succeed");
422    }
423
424    #[test]
425    fn test_add_route_definition_uses_runtime_registered_language() {
426        use camel_language_api::{Expression, LanguageError, Predicate};
427
428        struct DummyExpression;
429        impl Expression for DummyExpression {
430            fn evaluate(
431                &self,
432                _exchange: &camel_api::Exchange,
433            ) -> Result<camel_api::Value, LanguageError> {
434                Ok(camel_api::Value::String("ok".into()))
435            }
436        }
437
438        struct DummyPredicate;
439        impl Predicate for DummyPredicate {
440            fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
441                Ok(true)
442            }
443        }
444
445        struct RuntimeLang;
446        impl camel_language_api::Language for RuntimeLang {
447            fn name(&self) -> &'static str {
448                "runtime"
449            }
450
451            fn create_expression(
452                &self,
453                _script: &str,
454            ) -> Result<Box<dyn Expression>, LanguageError> {
455                Ok(Box::new(DummyExpression))
456            }
457
458            fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
459                Ok(Box::new(DummyPredicate))
460            }
461        }
462
463        let mut ctx = CamelContext::new();
464        ctx.register_language("runtime", Box::new(RuntimeLang))
465            .unwrap();
466
467        let definition = RouteDefinition::new(
468            "timer:tick",
469            vec![BuilderStep::DeclarativeScript {
470                expression: LanguageExpressionDef {
471                    language: "runtime".into(),
472                    source: "${body}".into(),
473                },
474            }],
475        )
476        .with_route_id("runtime-lang-route");
477
478        let result = ctx.add_route_definition(definition);
479        assert!(
480            result.is_ok(),
481            "route should resolve runtime language: {result:?}"
482        );
483    }
484
485    #[test]
486    fn test_add_route_definition_fails_for_unregistered_runtime_language() {
487        let mut ctx = CamelContext::new();
488        let definition = RouteDefinition::new(
489            "timer:tick",
490            vec![BuilderStep::DeclarativeSetBody {
491                value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
492                    language: "missing-lang".into(),
493                    source: "${body}".into(),
494                }),
495            }],
496        )
497        .with_route_id("missing-runtime-lang-route");
498
499        let result = ctx.add_route_definition(definition);
500        assert!(
501            result.is_err(),
502            "route should fail when language is missing"
503        );
504        let error_text = result.unwrap_err().to_string();
505        assert!(
506            error_text.contains("missing-lang"),
507            "error should mention missing language, got: {error_text}"
508        );
509    }
510}