1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus};
9use camel_component::Component;
10use camel_language_api::Language;
11use camel_language_api::LanguageError;
12
13use crate::registry::Registry;
14use crate::route::RouteDefinition;
15use crate::route_controller::DefaultRouteController;
16
17pub struct CamelContext {
25 registry: Arc<std::sync::Mutex<Registry>>,
26 route_controller: Arc<Mutex<DefaultRouteController>>,
27 cancel_token: CancellationToken,
28 metrics: Arc<dyn MetricsCollector>,
29 languages: HashMap<String, Box<dyn Language>>,
30}
31
32impl CamelContext {
33 pub fn new() -> Self {
35 Self::with_metrics(Arc::new(NoOpMetrics))
36 }
37
38 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
40 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
41 let controller = Arc::new(Mutex::new(DefaultRouteController::new(Arc::clone(
42 ®istry,
43 ))));
44
45 controller
48 .try_lock()
49 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
50 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
51
52 let mut languages: HashMap<String, Box<dyn Language>> = HashMap::new();
54 languages.insert(
55 "simple".to_string(),
56 Box::new(camel_language_simple::SimpleLanguage),
57 );
58
59 Self {
60 registry,
61 route_controller: controller,
62 cancel_token: CancellationToken::new(),
63 metrics,
64 languages,
65 }
66 }
67
68 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
70 self.route_controller
71 .try_lock()
72 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
73 .set_error_handler(config);
74 }
75
76 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
78 info!(scheme = component.scheme(), "Registering component");
79 self.registry
80 .lock()
81 .expect("mutex poisoned: another thread panicked while holding this lock")
82 .register(component);
83 }
84
85 pub fn register_language(
91 &mut self,
92 name: impl Into<String>,
93 lang: Box<dyn Language>,
94 ) -> Result<(), LanguageError> {
95 let name = name.into();
96 if self.languages.contains_key(&name) {
97 return Err(LanguageError::AlreadyRegistered(name));
98 }
99 self.languages.insert(name, lang);
100 Ok(())
101 }
102
103 pub fn resolve_language(&self, name: &str) -> Option<&dyn Language> {
105 self.languages.get(name).map(|l| l.as_ref())
106 }
107
108 pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
112 info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
113
114 self.route_controller
115 .try_lock()
116 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
117 .add_route(definition)
118 }
119
120 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
122 self.registry
123 .lock()
124 .expect("mutex poisoned: another thread panicked while holding this lock")
125 }
126
127 pub fn route_controller(&self) -> &Arc<Mutex<DefaultRouteController>> {
129 &self.route_controller
130 }
131
132 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
134 Arc::clone(&self.metrics)
135 }
136
137 pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
139 self.route_controller
140 .try_lock()
141 .ok()?
142 .route_status(route_id)
143 }
144
145 pub async fn start(&mut self) -> Result<(), CamelError> {
150 info!("Starting CamelContext");
151
152 self.route_controller
153 .lock()
154 .await
155 .start_all_routes()
156 .await?;
157 info!("CamelContext started");
158 Ok(())
159 }
160
161 pub async fn stop(&mut self) -> Result<(), CamelError> {
163 self.stop_timeout(std::time::Duration::from_secs(30)).await
164 }
165
166 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
171 info!("Stopping CamelContext");
172
173 self.cancel_token.cancel();
175
176 self.route_controller.lock().await.stop_all_routes().await?;
178
179 info!("CamelContext stopped");
180 Ok(())
181 }
182
183 pub async fn abort(&mut self) {
185 self.cancel_token.cancel();
186 let _ = self.route_controller.lock().await.stop_all_routes().await;
187 }
188}
189
190impl Default for CamelContext {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use camel_api::CamelError;
200 use camel_component::Endpoint;
201
202 struct MockComponent;
204
205 impl Component for MockComponent {
206 fn scheme(&self) -> &str {
207 "mock"
208 }
209
210 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
211 Err(CamelError::ComponentNotFound("mock".to_string()))
212 }
213 }
214
215 #[test]
216 fn test_context_handles_mutex_poisoning_gracefully() {
217 let mut ctx = CamelContext::new();
218
219 ctx.register_component(MockComponent);
221
222 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
224 let _guard = ctx.registry();
225 }));
226
227 assert!(
228 result.is_ok(),
229 "Registry access should handle mutex poisoning"
230 );
231 }
232
233 #[test]
234 fn test_context_resolves_simple_language() {
235 let ctx = CamelContext::new();
236 let lang = ctx
237 .resolve_language("simple")
238 .expect("simple language not found");
239 assert_eq!(lang.name(), "simple");
240 }
241
242 #[test]
243 fn test_simple_language_via_context() {
244 let ctx = CamelContext::new();
245 let lang = ctx.resolve_language("simple").unwrap();
246 let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
247 let mut msg = camel_api::message::Message::default();
248 msg.set_header("x", camel_api::Value::String("hello".into()));
249 let ex = camel_api::exchange::Exchange::new(msg);
250 assert!(pred.matches(&ex).unwrap());
251 }
252
253 #[test]
254 fn test_resolve_unknown_language_returns_none() {
255 let ctx = CamelContext::new();
256 assert!(ctx.resolve_language("nonexistent").is_none());
257 }
258
259 #[test]
260 fn test_register_language_duplicate_returns_error() {
261 use camel_language_api::LanguageError;
262 struct DummyLang;
263 impl camel_language_api::Language for DummyLang {
264 fn name(&self) -> &'static str {
265 "dummy"
266 }
267 fn create_expression(
268 &self,
269 _: &str,
270 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
271 Err(LanguageError::EvalError("not implemented".into()))
272 }
273 fn create_predicate(
274 &self,
275 _: &str,
276 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
277 Err(LanguageError::EvalError("not implemented".into()))
278 }
279 }
280
281 let mut ctx = CamelContext::new();
282 ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
283 let result = ctx.register_language("dummy", Box::new(DummyLang));
284 assert!(result.is_err(), "duplicate registration should fail");
285 let err_msg = result.unwrap_err().to_string();
286 assert!(
287 err_msg.contains("dummy"),
288 "error should mention the language name"
289 );
290 }
291
292 #[test]
293 fn test_register_language_new_key_succeeds() {
294 use camel_language_api::LanguageError;
295 struct DummyLang;
296 impl camel_language_api::Language for DummyLang {
297 fn name(&self) -> &'static str {
298 "dummy"
299 }
300 fn create_expression(
301 &self,
302 _: &str,
303 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
304 Err(LanguageError::EvalError("not implemented".into()))
305 }
306 fn create_predicate(
307 &self,
308 _: &str,
309 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
310 Err(LanguageError::EvalError("not implemented".into()))
311 }
312 }
313
314 let mut ctx = CamelContext::new();
315 let result = ctx.register_language("dummy", Box::new(DummyLang));
316 assert!(result.is_ok(), "first registration should succeed");
317 }
318}