Skip to main content

camel_core/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus};
9use camel_component::Component;
10use camel_language_api::Language;
11use camel_language_api::LanguageError;
12
13use crate::registry::Registry;
14use crate::route::RouteDefinition;
15use crate::route_controller::DefaultRouteController;
16
17/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
18///
19/// # Lifecycle
20///
21/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
22/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
23/// stopped context is not supported — create a new instance instead.
24pub struct CamelContext {
25    registry: Arc<std::sync::Mutex<Registry>>,
26    route_controller: Arc<Mutex<DefaultRouteController>>,
27    cancel_token: CancellationToken,
28    metrics: Arc<dyn MetricsCollector>,
29    languages: HashMap<String, Box<dyn Language>>,
30}
31
32impl CamelContext {
33    /// Create a new, empty CamelContext.
34    pub fn new() -> Self {
35        Self::with_metrics(Arc::new(NoOpMetrics))
36    }
37
38    /// Create a new CamelContext with a custom metrics collector.
39    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
40        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
41        let controller = Arc::new(Mutex::new(DefaultRouteController::new(Arc::clone(
42            &registry,
43        ))));
44
45        // Set self-ref so DefaultRouteController can create ProducerContext
46        // Use try_lock since we just created it and nobody else has access yet
47        controller
48            .try_lock()
49            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
50            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
51
52        // Pre-register built-in languages
53        let mut languages: HashMap<String, Box<dyn Language>> = HashMap::new();
54        languages.insert(
55            "simple".to_string(),
56            Box::new(camel_language_simple::SimpleLanguage),
57        );
58
59        Self {
60            registry,
61            route_controller: controller,
62            cancel_token: CancellationToken::new(),
63            metrics,
64            languages,
65        }
66    }
67
68    /// Set a global error handler applied to all routes without a per-route handler.
69    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
70        self.route_controller
71            .try_lock()
72            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
73            .set_error_handler(config);
74    }
75
76    /// Register a component with this context.
77    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
78        info!(scheme = component.scheme(), "Registering component");
79        self.registry
80            .lock()
81            .expect("mutex poisoned: another thread panicked while holding this lock")
82            .register(component);
83    }
84
85    /// Register a language with this context, keyed by name.
86    ///
87    /// Returns `Err(LanguageError::AlreadyRegistered)` if a language with the
88    /// same name is already registered. Use [`resolve_language`](Self::resolve_language)
89    /// to check before registering, or choose a distinct name.
90    pub fn register_language(
91        &mut self,
92        name: impl Into<String>,
93        lang: Box<dyn Language>,
94    ) -> Result<(), LanguageError> {
95        let name = name.into();
96        if self.languages.contains_key(&name) {
97            return Err(LanguageError::AlreadyRegistered(name));
98        }
99        self.languages.insert(name, lang);
100        Ok(())
101    }
102
103    /// Resolve a language by name. Returns `None` if not registered.
104    pub fn resolve_language(&self, name: &str) -> Option<&dyn Language> {
105        self.languages.get(name).map(|l| l.as_ref())
106    }
107
108    /// Add a route definition to this context.
109    ///
110    /// The route must have an ID. Steps are resolved immediately using registered components.
111    pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
112        info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
113
114        self.route_controller
115            .try_lock()
116            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
117            .add_route(definition)
118    }
119
120    /// Access the component registry.
121    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
122        self.registry
123            .lock()
124            .expect("mutex poisoned: another thread panicked while holding this lock")
125    }
126
127    /// Access the route controller.
128    pub fn route_controller(&self) -> &Arc<Mutex<DefaultRouteController>> {
129        &self.route_controller
130    }
131
132    /// Get the metrics collector.
133    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
134        Arc::clone(&self.metrics)
135    }
136
137    /// Get the status of a route by ID.
138    pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
139        self.route_controller
140            .try_lock()
141            .ok()?
142            .route_status(route_id)
143    }
144
145    /// Start all routes. Each route's consumer will begin producing exchanges.
146    ///
147    /// Only routes with `auto_startup == true` will be started, in order of their
148    /// `startup_order` (lower values start first).
149    pub async fn start(&mut self) -> Result<(), CamelError> {
150        info!("Starting CamelContext");
151
152        self.route_controller
153            .lock()
154            .await
155            .start_all_routes()
156            .await?;
157        info!("CamelContext started");
158        Ok(())
159    }
160
161    /// Graceful shutdown with default 30-second timeout.
162    pub async fn stop(&mut self) -> Result<(), CamelError> {
163        self.stop_timeout(std::time::Duration::from_secs(30)).await
164    }
165
166    /// Graceful shutdown with custom timeout.
167    ///
168    /// Note: The timeout parameter is currently not used directly; the RouteController
169    /// manages its own shutdown timeout. This may change in a future version.
170    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
171        info!("Stopping CamelContext");
172
173        // Signal cancellation (for any legacy code that might use it)
174        self.cancel_token.cancel();
175
176        // Stop all routes via the controller
177        self.route_controller.lock().await.stop_all_routes().await?;
178
179        info!("CamelContext stopped");
180        Ok(())
181    }
182
183    /// Immediate abort — kills all tasks without draining.
184    pub async fn abort(&mut self) {
185        self.cancel_token.cancel();
186        let _ = self.route_controller.lock().await.stop_all_routes().await;
187    }
188}
189
190impl Default for CamelContext {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use camel_api::CamelError;
200    use camel_component::Endpoint;
201
202    /// Mock component for testing
203    struct MockComponent;
204
205    impl Component for MockComponent {
206        fn scheme(&self) -> &str {
207            "mock"
208        }
209
210        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
211            Err(CamelError::ComponentNotFound("mock".to_string()))
212        }
213    }
214
215    #[test]
216    fn test_context_handles_mutex_poisoning_gracefully() {
217        let mut ctx = CamelContext::new();
218
219        // Register a component successfully
220        ctx.register_component(MockComponent);
221
222        // Access registry should work even after potential panic in another thread
223        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
224            let _guard = ctx.registry();
225        }));
226
227        assert!(
228            result.is_ok(),
229            "Registry access should handle mutex poisoning"
230        );
231    }
232
233    #[test]
234    fn test_context_resolves_simple_language() {
235        let ctx = CamelContext::new();
236        let lang = ctx
237            .resolve_language("simple")
238            .expect("simple language not found");
239        assert_eq!(lang.name(), "simple");
240    }
241
242    #[test]
243    fn test_simple_language_via_context() {
244        let ctx = CamelContext::new();
245        let lang = ctx.resolve_language("simple").unwrap();
246        let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
247        let mut msg = camel_api::message::Message::default();
248        msg.set_header("x", camel_api::Value::String("hello".into()));
249        let ex = camel_api::exchange::Exchange::new(msg);
250        assert!(pred.matches(&ex).unwrap());
251    }
252
253    #[test]
254    fn test_resolve_unknown_language_returns_none() {
255        let ctx = CamelContext::new();
256        assert!(ctx.resolve_language("nonexistent").is_none());
257    }
258
259    #[test]
260    fn test_register_language_duplicate_returns_error() {
261        use camel_language_api::LanguageError;
262        struct DummyLang;
263        impl camel_language_api::Language for DummyLang {
264            fn name(&self) -> &'static str {
265                "dummy"
266            }
267            fn create_expression(
268                &self,
269                _: &str,
270            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
271                Err(LanguageError::EvalError("not implemented".into()))
272            }
273            fn create_predicate(
274                &self,
275                _: &str,
276            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
277                Err(LanguageError::EvalError("not implemented".into()))
278            }
279        }
280
281        let mut ctx = CamelContext::new();
282        ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
283        let result = ctx.register_language("dummy", Box::new(DummyLang));
284        assert!(result.is_err(), "duplicate registration should fail");
285        let err_msg = result.unwrap_err().to_string();
286        assert!(
287            err_msg.contains("dummy"),
288            "error should mention the language name"
289        );
290    }
291
292    #[test]
293    fn test_register_language_new_key_succeeds() {
294        use camel_language_api::LanguageError;
295        struct DummyLang;
296        impl camel_language_api::Language for DummyLang {
297            fn name(&self) -> &'static str {
298                "dummy"
299            }
300            fn create_expression(
301                &self,
302                _: &str,
303            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
304                Err(LanguageError::EvalError("not implemented".into()))
305            }
306            fn create_predicate(
307                &self,
308                _: &str,
309            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
310                Err(LanguageError::EvalError("not implemented".into()))
311            }
312        }
313
314        let mut ctx = CamelContext::new();
315        let result = ctx.register_language("dummy", Box::new(DummyLang));
316        assert!(result.is_ok(), "first registration should succeed");
317    }
318}