1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{
9 CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus, SupervisionConfig,
10};
11use camel_component::Component;
12use camel_language_api::Language;
13use camel_language_api::LanguageError;
14
15use crate::config::TracerConfig;
16use crate::registry::Registry;
17use crate::route::RouteDefinition;
18use crate::route_controller::{
19 DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
20};
21use crate::supervising_route_controller::SupervisingRouteController;
22
23pub struct CamelContext {
31 registry: Arc<std::sync::Mutex<Registry>>,
32 route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
33 cancel_token: CancellationToken,
34 metrics: Arc<dyn MetricsCollector>,
35 languages: SharedLanguageRegistry,
36 shutdown_timeout: std::time::Duration,
37}
38
39impl CamelContext {
40 fn built_in_languages() -> SharedLanguageRegistry {
41 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
42 languages.insert(
43 "simple".to_string(),
44 Arc::new(camel_language_simple::SimpleLanguage),
45 );
46 Arc::new(std::sync::Mutex::new(languages))
47 }
48
49 pub fn new() -> Self {
51 Self::with_metrics(Arc::new(NoOpMetrics))
52 }
53
54 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
56 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
57 let languages = Self::built_in_languages();
58 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
59 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
60 ));
61
62 controller
65 .try_lock()
66 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
67 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
68
69 Self {
70 registry,
71 route_controller: controller,
72 cancel_token: CancellationToken::new(),
73 metrics,
74 languages,
75 shutdown_timeout: std::time::Duration::from_secs(30),
76 }
77 }
78
79 pub fn with_supervision(config: SupervisionConfig) -> Self {
83 Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
84 }
85
86 pub fn with_supervision_and_metrics(
90 config: SupervisionConfig,
91 metrics: Arc<dyn MetricsCollector>,
92 ) -> Self {
93 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
94 let languages = Self::built_in_languages();
95 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
96 SupervisingRouteController::with_languages(
97 Arc::clone(®istry),
98 config,
99 Arc::clone(&languages),
100 )
101 .with_metrics(Arc::clone(&metrics)),
102 ));
103
104 controller
107 .try_lock()
108 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
109 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
110
111 Self {
112 registry,
113 route_controller: controller,
114 cancel_token: CancellationToken::new(),
115 metrics,
116 languages,
117 shutdown_timeout: std::time::Duration::from_secs(30),
118 }
119 }
120
121 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
123 self.route_controller
124 .try_lock()
125 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
126 .set_error_handler(config);
127 }
128
129 pub fn set_tracing(&mut self, enabled: bool) {
131 self.route_controller
132 .try_lock()
133 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
134 .set_tracer_config(&TracerConfig {
135 enabled,
136 ..Default::default()
137 });
138 }
139
140 pub fn set_tracer_config(&mut self, config: TracerConfig) {
142 self.route_controller
143 .try_lock()
144 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
145 .set_tracer_config(&config);
146 }
147
148 pub fn with_tracing(mut self) -> Self {
150 self.set_tracing(true);
151 self
152 }
153
154 pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
158 self.set_tracer_config(config);
159 self
160 }
161
162 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
164 info!(scheme = component.scheme(), "Registering component");
165 self.registry
166 .lock()
167 .expect("mutex poisoned: another thread panicked while holding this lock")
168 .register(component);
169 }
170
171 pub fn register_language(
177 &mut self,
178 name: impl Into<String>,
179 lang: Box<dyn Language>,
180 ) -> Result<(), LanguageError> {
181 let name = name.into();
182 let mut languages = self
183 .languages
184 .lock()
185 .expect("mutex poisoned: another thread panicked while holding this lock");
186 if languages.contains_key(&name) {
187 return Err(LanguageError::AlreadyRegistered(name));
188 }
189 languages.insert(name, Arc::from(lang));
190 Ok(())
191 }
192
193 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
195 let languages = self
196 .languages
197 .lock()
198 .expect("mutex poisoned: another thread panicked while holding this lock");
199 languages.get(name).cloned()
200 }
201
202 pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
206 info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
207
208 self.route_controller
209 .try_lock()
210 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
211 .add_route(definition)
212 }
213
214 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
216 self.registry
217 .lock()
218 .expect("mutex poisoned: another thread panicked while holding this lock")
219 }
220
221 pub fn route_controller(&self) -> &Arc<Mutex<dyn RouteControllerInternal>> {
223 &self.route_controller
224 }
225
226 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
228 Arc::clone(&self.metrics)
229 }
230
231 pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
233 self.route_controller
234 .try_lock()
235 .ok()?
236 .route_status(route_id)
237 }
238
239 pub async fn start(&mut self) -> Result<(), CamelError> {
244 info!("Starting CamelContext");
245
246 self.route_controller
247 .lock()
248 .await
249 .start_all_routes()
250 .await?;
251 info!("CamelContext started");
252 Ok(())
253 }
254
255 pub async fn stop(&mut self) -> Result<(), CamelError> {
257 self.stop_timeout(self.shutdown_timeout).await
258 }
259
260 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
265 info!("Stopping CamelContext");
266
267 self.cancel_token.cancel();
269
270 self.route_controller.lock().await.stop_all_routes().await?;
272
273 info!("CamelContext stopped");
274 Ok(())
275 }
276
277 pub fn shutdown_timeout(&self) -> std::time::Duration {
279 self.shutdown_timeout
280 }
281
282 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
284 self.shutdown_timeout = timeout;
285 }
286
287 pub async fn abort(&mut self) {
289 self.cancel_token.cancel();
290 let _ = self.route_controller.lock().await.stop_all_routes().await;
291 }
292}
293
294impl Default for CamelContext {
295 fn default() -> Self {
296 Self::new()
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::route::{BuilderStep, LanguageExpressionDef, RouteDefinition};
304 use camel_api::CamelError;
305 use camel_component::Endpoint;
306
307 struct MockComponent;
309
310 impl Component for MockComponent {
311 fn scheme(&self) -> &str {
312 "mock"
313 }
314
315 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
316 Err(CamelError::ComponentNotFound("mock".to_string()))
317 }
318 }
319
320 #[test]
321 fn test_context_handles_mutex_poisoning_gracefully() {
322 let mut ctx = CamelContext::new();
323
324 ctx.register_component(MockComponent);
326
327 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
329 let _guard = ctx.registry();
330 }));
331
332 assert!(
333 result.is_ok(),
334 "Registry access should handle mutex poisoning"
335 );
336 }
337
338 #[test]
339 fn test_context_resolves_simple_language() {
340 let ctx = CamelContext::new();
341 let lang = ctx
342 .resolve_language("simple")
343 .expect("simple language not found");
344 assert_eq!(lang.name(), "simple");
345 }
346
347 #[test]
348 fn test_simple_language_via_context() {
349 let ctx = CamelContext::new();
350 let lang = ctx.resolve_language("simple").unwrap();
351 let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
352 let mut msg = camel_api::message::Message::default();
353 msg.set_header("x", camel_api::Value::String("hello".into()));
354 let ex = camel_api::exchange::Exchange::new(msg);
355 assert!(pred.matches(&ex).unwrap());
356 }
357
358 #[test]
359 fn test_resolve_unknown_language_returns_none() {
360 let ctx = CamelContext::new();
361 assert!(ctx.resolve_language("nonexistent").is_none());
362 }
363
364 #[test]
365 fn test_register_language_duplicate_returns_error() {
366 use camel_language_api::LanguageError;
367 struct DummyLang;
368 impl camel_language_api::Language for DummyLang {
369 fn name(&self) -> &'static str {
370 "dummy"
371 }
372 fn create_expression(
373 &self,
374 _: &str,
375 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
376 Err(LanguageError::EvalError("not implemented".into()))
377 }
378 fn create_predicate(
379 &self,
380 _: &str,
381 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
382 Err(LanguageError::EvalError("not implemented".into()))
383 }
384 }
385
386 let mut ctx = CamelContext::new();
387 ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
388 let result = ctx.register_language("dummy", Box::new(DummyLang));
389 assert!(result.is_err(), "duplicate registration should fail");
390 let err_msg = result.unwrap_err().to_string();
391 assert!(
392 err_msg.contains("dummy"),
393 "error should mention the language name"
394 );
395 }
396
397 #[test]
398 fn test_register_language_new_key_succeeds() {
399 use camel_language_api::LanguageError;
400 struct DummyLang;
401 impl camel_language_api::Language for DummyLang {
402 fn name(&self) -> &'static str {
403 "dummy"
404 }
405 fn create_expression(
406 &self,
407 _: &str,
408 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
409 Err(LanguageError::EvalError("not implemented".into()))
410 }
411 fn create_predicate(
412 &self,
413 _: &str,
414 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
415 Err(LanguageError::EvalError("not implemented".into()))
416 }
417 }
418
419 let mut ctx = CamelContext::new();
420 let result = ctx.register_language("dummy", Box::new(DummyLang));
421 assert!(result.is_ok(), "first registration should succeed");
422 }
423
424 #[test]
425 fn test_add_route_definition_uses_runtime_registered_language() {
426 use camel_language_api::{Expression, LanguageError, Predicate};
427
428 struct DummyExpression;
429 impl Expression for DummyExpression {
430 fn evaluate(
431 &self,
432 _exchange: &camel_api::Exchange,
433 ) -> Result<camel_api::Value, LanguageError> {
434 Ok(camel_api::Value::String("ok".into()))
435 }
436 }
437
438 struct DummyPredicate;
439 impl Predicate for DummyPredicate {
440 fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
441 Ok(true)
442 }
443 }
444
445 struct RuntimeLang;
446 impl camel_language_api::Language for RuntimeLang {
447 fn name(&self) -> &'static str {
448 "runtime"
449 }
450
451 fn create_expression(
452 &self,
453 _script: &str,
454 ) -> Result<Box<dyn Expression>, LanguageError> {
455 Ok(Box::new(DummyExpression))
456 }
457
458 fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
459 Ok(Box::new(DummyPredicate))
460 }
461 }
462
463 let mut ctx = CamelContext::new();
464 ctx.register_language("runtime", Box::new(RuntimeLang))
465 .unwrap();
466
467 let definition = RouteDefinition::new(
468 "timer:tick",
469 vec![BuilderStep::DeclarativeScript {
470 expression: LanguageExpressionDef {
471 language: "runtime".into(),
472 source: "${body}".into(),
473 },
474 }],
475 )
476 .with_route_id("runtime-lang-route");
477
478 let result = ctx.add_route_definition(definition);
479 assert!(
480 result.is_ok(),
481 "route should resolve runtime language: {result:?}"
482 );
483 }
484
485 #[test]
486 fn test_add_route_definition_fails_for_unregistered_runtime_language() {
487 let mut ctx = CamelContext::new();
488 let definition = RouteDefinition::new(
489 "timer:tick",
490 vec![BuilderStep::DeclarativeSetBody {
491 value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
492 language: "missing-lang".into(),
493 source: "${body}".into(),
494 }),
495 }],
496 )
497 .with_route_id("missing-runtime-lang-route");
498
499 let result = ctx.add_route_definition(definition);
500 assert!(
501 result.is_err(),
502 "route should fail when language is missing"
503 );
504 let error_text = result.unwrap_err().to_string();
505 assert!(
506 error_text.contains("missing-lang"),
507 "error should mention missing language, got: {error_text}"
508 );
509 }
510}