1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::{info, warn};
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{
9 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
10 RouteController, RouteStatus, ServiceHealth, ServiceStatus, SupervisionConfig,
11};
12use camel_component::Component;
13use camel_language_api::Language;
14use camel_language_api::LanguageError;
15
16use crate::config::TracerConfig;
17use crate::registry::Registry;
18use crate::route::RouteDefinition;
19use crate::route_controller::{
20 DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
21};
22use crate::supervising_route_controller::SupervisingRouteController;
23
24pub struct CamelContext {
32 registry: Arc<std::sync::Mutex<Registry>>,
33 route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
34 cancel_token: CancellationToken,
35 metrics: Arc<dyn MetricsCollector>,
36 languages: SharedLanguageRegistry,
37 shutdown_timeout: std::time::Duration,
38 services: Vec<Box<dyn Lifecycle>>,
39}
40
41impl CamelContext {
42 fn built_in_languages() -> SharedLanguageRegistry {
43 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
44 languages.insert(
45 "simple".to_string(),
46 Arc::new(camel_language_simple::SimpleLanguage),
47 );
48 Arc::new(std::sync::Mutex::new(languages))
49 }
50
51 pub fn new() -> Self {
53 Self::with_metrics(Arc::new(NoOpMetrics))
54 }
55
56 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
58 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
59 let languages = Self::built_in_languages();
60 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
61 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
62 ));
63
64 controller
67 .try_lock()
68 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
69 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
70
71 Self {
72 registry,
73 route_controller: controller,
74 cancel_token: CancellationToken::new(),
75 metrics,
76 languages,
77 shutdown_timeout: std::time::Duration::from_secs(30),
78 services: Vec::new(),
79 }
80 }
81
82 pub fn with_supervision(config: SupervisionConfig) -> Self {
86 Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
87 }
88
89 pub fn with_supervision_and_metrics(
93 config: SupervisionConfig,
94 metrics: Arc<dyn MetricsCollector>,
95 ) -> Self {
96 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
97 let languages = Self::built_in_languages();
98 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
99 SupervisingRouteController::with_languages(
100 Arc::clone(®istry),
101 config,
102 Arc::clone(&languages),
103 )
104 .with_metrics(Arc::clone(&metrics)),
105 ));
106
107 controller
110 .try_lock()
111 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
112 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
113
114 Self {
115 registry,
116 route_controller: controller,
117 cancel_token: CancellationToken::new(),
118 metrics,
119 languages,
120 shutdown_timeout: std::time::Duration::from_secs(30),
121 services: Vec::new(),
122 }
123 }
124
125 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
127 self.route_controller
128 .try_lock()
129 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
130 .set_error_handler(config);
131 }
132
133 pub fn set_tracing(&mut self, enabled: bool) {
135 self.route_controller
136 .try_lock()
137 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
138 .set_tracer_config(&TracerConfig {
139 enabled,
140 ..Default::default()
141 });
142 }
143
144 pub fn set_tracer_config(&mut self, config: TracerConfig) {
146 self.route_controller
147 .try_lock()
148 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
149 .set_tracer_config(&config);
150 }
151
152 pub fn with_tracing(mut self) -> Self {
154 self.set_tracing(true);
155 self
156 }
157
158 pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
162 self.set_tracer_config(config);
163 self
164 }
165
166 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
168 if let Some(collector) = service.as_metrics_collector() {
170 self.metrics = collector;
171 }
172
173 self.services.push(Box::new(service));
174 self
175 }
176
177 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
179 info!(scheme = component.scheme(), "Registering component");
180 self.registry
181 .lock()
182 .expect("mutex poisoned: another thread panicked while holding this lock")
183 .register(component);
184 }
185
186 pub fn register_language(
192 &mut self,
193 name: impl Into<String>,
194 lang: Box<dyn Language>,
195 ) -> Result<(), LanguageError> {
196 let name = name.into();
197 let mut languages = self
198 .languages
199 .lock()
200 .expect("mutex poisoned: another thread panicked while holding this lock");
201 if languages.contains_key(&name) {
202 return Err(LanguageError::AlreadyRegistered(name));
203 }
204 languages.insert(name, Arc::from(lang));
205 Ok(())
206 }
207
208 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
210 let languages = self
211 .languages
212 .lock()
213 .expect("mutex poisoned: another thread panicked while holding this lock");
214 languages.get(name).cloned()
215 }
216
217 pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
221 info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
222
223 self.route_controller
224 .try_lock()
225 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
226 .add_route(definition)
227 }
228
229 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
231 self.registry
232 .lock()
233 .expect("mutex poisoned: another thread panicked while holding this lock")
234 }
235
236 pub fn route_controller(&self) -> &Arc<Mutex<dyn RouteControllerInternal>> {
238 &self.route_controller
239 }
240
241 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
243 Arc::clone(&self.metrics)
244 }
245
246 pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
248 self.route_controller
249 .try_lock()
250 .ok()?
251 .route_status(route_id)
252 }
253
254 pub async fn start(&mut self) -> Result<(), CamelError> {
259 info!("Starting CamelContext");
260
261 for (i, service) in self.services.iter_mut().enumerate() {
263 info!("Starting service: {}", service.name());
264 if let Err(e) = service.start().await {
265 warn!(
267 "Service {} failed to start, rolling back {} services",
268 service.name(),
269 i
270 );
271 for j in (0..i).rev() {
272 if let Err(rollback_err) = self.services[j].stop().await {
273 warn!(
274 "Failed to stop service {} during rollback: {}",
275 self.services[j].name(),
276 rollback_err
277 );
278 }
279 }
280 return Err(e);
281 }
282 }
283
284 self.route_controller
286 .lock()
287 .await
288 .start_all_routes()
289 .await?;
290
291 info!("CamelContext started");
292 Ok(())
293 }
294
295 pub async fn stop(&mut self) -> Result<(), CamelError> {
297 self.stop_timeout(self.shutdown_timeout).await
298 }
299
300 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
305 info!("Stopping CamelContext");
306
307 self.cancel_token.cancel();
309
310 self.route_controller.lock().await.stop_all_routes().await?;
312
313 let mut first_error = None;
316 for service in &mut self.services {
317 info!("Stopping service: {}", service.name());
318 if let Err(e) = service.stop().await {
319 warn!("Service {} failed to stop: {}", service.name(), e);
320 if first_error.is_none() {
321 first_error = Some(e);
322 }
323 }
324 }
325
326 info!("CamelContext stopped");
327
328 if let Some(e) = first_error {
329 Err(e)
330 } else {
331 Ok(())
332 }
333 }
334
335 pub fn shutdown_timeout(&self) -> std::time::Duration {
337 self.shutdown_timeout
338 }
339
340 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
342 self.shutdown_timeout = timeout;
343 }
344
345 pub async fn abort(&mut self) {
347 self.cancel_token.cancel();
348 let _ = self.route_controller.lock().await.stop_all_routes().await;
349 }
350
351 pub fn health_check(&self) -> HealthReport {
353 let services: Vec<ServiceHealth> = self
354 .services
355 .iter()
356 .map(|s| ServiceHealth {
357 name: s.name().to_string(),
358 status: s.status(),
359 })
360 .collect();
361
362 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
363 HealthStatus::Healthy
364 } else {
365 HealthStatus::Unhealthy
366 };
367
368 HealthReport {
369 status,
370 services,
371 ..Default::default()
372 }
373 }
374}
375
376impl Default for CamelContext {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::route::{BuilderStep, LanguageExpressionDef, RouteDefinition};
386 use camel_api::CamelError;
387 use camel_component::Endpoint;
388
389 struct MockComponent;
391
392 impl Component for MockComponent {
393 fn scheme(&self) -> &str {
394 "mock"
395 }
396
397 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
398 Err(CamelError::ComponentNotFound("mock".to_string()))
399 }
400 }
401
402 #[test]
403 fn test_context_handles_mutex_poisoning_gracefully() {
404 let mut ctx = CamelContext::new();
405
406 ctx.register_component(MockComponent);
408
409 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
411 let _guard = ctx.registry();
412 }));
413
414 assert!(
415 result.is_ok(),
416 "Registry access should handle mutex poisoning"
417 );
418 }
419
420 #[test]
421 fn test_context_resolves_simple_language() {
422 let ctx = CamelContext::new();
423 let lang = ctx
424 .resolve_language("simple")
425 .expect("simple language not found");
426 assert_eq!(lang.name(), "simple");
427 }
428
429 #[test]
430 fn test_simple_language_via_context() {
431 let ctx = CamelContext::new();
432 let lang = ctx.resolve_language("simple").unwrap();
433 let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
434 let mut msg = camel_api::message::Message::default();
435 msg.set_header("x", camel_api::Value::String("hello".into()));
436 let ex = camel_api::exchange::Exchange::new(msg);
437 assert!(pred.matches(&ex).unwrap());
438 }
439
440 #[test]
441 fn test_resolve_unknown_language_returns_none() {
442 let ctx = CamelContext::new();
443 assert!(ctx.resolve_language("nonexistent").is_none());
444 }
445
446 #[test]
447 fn test_register_language_duplicate_returns_error() {
448 use camel_language_api::LanguageError;
449 struct DummyLang;
450 impl camel_language_api::Language for DummyLang {
451 fn name(&self) -> &'static str {
452 "dummy"
453 }
454 fn create_expression(
455 &self,
456 _: &str,
457 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
458 Err(LanguageError::EvalError("not implemented".into()))
459 }
460 fn create_predicate(
461 &self,
462 _: &str,
463 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
464 Err(LanguageError::EvalError("not implemented".into()))
465 }
466 }
467
468 let mut ctx = CamelContext::new();
469 ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
470 let result = ctx.register_language("dummy", Box::new(DummyLang));
471 assert!(result.is_err(), "duplicate registration should fail");
472 let err_msg = result.unwrap_err().to_string();
473 assert!(
474 err_msg.contains("dummy"),
475 "error should mention the language name"
476 );
477 }
478
479 #[test]
480 fn test_register_language_new_key_succeeds() {
481 use camel_language_api::LanguageError;
482 struct DummyLang;
483 impl camel_language_api::Language for DummyLang {
484 fn name(&self) -> &'static str {
485 "dummy"
486 }
487 fn create_expression(
488 &self,
489 _: &str,
490 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
491 Err(LanguageError::EvalError("not implemented".into()))
492 }
493 fn create_predicate(
494 &self,
495 _: &str,
496 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
497 Err(LanguageError::EvalError("not implemented".into()))
498 }
499 }
500
501 let mut ctx = CamelContext::new();
502 let result = ctx.register_language("dummy", Box::new(DummyLang));
503 assert!(result.is_ok(), "first registration should succeed");
504 }
505
506 #[test]
507 fn test_add_route_definition_uses_runtime_registered_language() {
508 use camel_language_api::{Expression, LanguageError, Predicate};
509
510 struct DummyExpression;
511 impl Expression for DummyExpression {
512 fn evaluate(
513 &self,
514 _exchange: &camel_api::Exchange,
515 ) -> Result<camel_api::Value, LanguageError> {
516 Ok(camel_api::Value::String("ok".into()))
517 }
518 }
519
520 struct DummyPredicate;
521 impl Predicate for DummyPredicate {
522 fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
523 Ok(true)
524 }
525 }
526
527 struct RuntimeLang;
528 impl camel_language_api::Language for RuntimeLang {
529 fn name(&self) -> &'static str {
530 "runtime"
531 }
532
533 fn create_expression(
534 &self,
535 _script: &str,
536 ) -> Result<Box<dyn Expression>, LanguageError> {
537 Ok(Box::new(DummyExpression))
538 }
539
540 fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
541 Ok(Box::new(DummyPredicate))
542 }
543 }
544
545 let mut ctx = CamelContext::new();
546 ctx.register_language("runtime", Box::new(RuntimeLang))
547 .unwrap();
548
549 let definition = RouteDefinition::new(
550 "timer:tick",
551 vec![BuilderStep::DeclarativeScript {
552 expression: LanguageExpressionDef {
553 language: "runtime".into(),
554 source: "${body}".into(),
555 },
556 }],
557 )
558 .with_route_id("runtime-lang-route");
559
560 let result = ctx.add_route_definition(definition);
561 assert!(
562 result.is_ok(),
563 "route should resolve runtime language: {result:?}"
564 );
565 }
566
567 #[test]
568 fn test_add_route_definition_fails_for_unregistered_runtime_language() {
569 let mut ctx = CamelContext::new();
570 let definition = RouteDefinition::new(
571 "timer:tick",
572 vec![BuilderStep::DeclarativeSetBody {
573 value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
574 language: "missing-lang".into(),
575 source: "${body}".into(),
576 }),
577 }],
578 )
579 .with_route_id("missing-runtime-lang-route");
580
581 let result = ctx.add_route_definition(definition);
582 assert!(
583 result.is_err(),
584 "route should fail when language is missing"
585 );
586 let error_text = result.unwrap_err().to_string();
587 assert!(
588 error_text.contains("missing-lang"),
589 "error should mention missing language, got: {error_text}"
590 );
591 }
592
593 #[test]
594 fn test_health_check_empty_context() {
595 let ctx = CamelContext::new();
596 let report = ctx.health_check();
597
598 assert_eq!(report.status, HealthStatus::Healthy);
599 assert!(report.services.is_empty());
600 }
601}
602
603#[cfg(test)]
604mod lifecycle_tests {
605 use super::*;
606 use async_trait::async_trait;
607 use camel_api::Lifecycle;
608 use std::sync::Arc;
609 use std::sync::atomic::{AtomicUsize, Ordering};
610
611 struct MockService {
612 start_count: Arc<AtomicUsize>,
613 stop_count: Arc<AtomicUsize>,
614 }
615
616 impl MockService {
617 fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
618 let start_count = Arc::new(AtomicUsize::new(0));
619 let stop_count = Arc::new(AtomicUsize::new(0));
620 (
621 Self {
622 start_count: start_count.clone(),
623 stop_count: stop_count.clone(),
624 },
625 start_count,
626 stop_count,
627 )
628 }
629 }
630
631 #[async_trait]
632 impl Lifecycle for MockService {
633 fn name(&self) -> &str {
634 "mock"
635 }
636
637 async fn start(&mut self) -> Result<(), CamelError> {
638 self.start_count.fetch_add(1, Ordering::SeqCst);
639 Ok(())
640 }
641
642 async fn stop(&mut self) -> Result<(), CamelError> {
643 self.stop_count.fetch_add(1, Ordering::SeqCst);
644 Ok(())
645 }
646 }
647
648 #[tokio::test]
649 async fn test_context_starts_lifecycle_services() {
650 let (service, start_count, stop_count) = MockService::new();
651
652 let mut ctx = CamelContext::new().with_lifecycle(service);
653
654 assert_eq!(start_count.load(Ordering::SeqCst), 0);
655
656 ctx.start().await.unwrap();
657
658 assert_eq!(start_count.load(Ordering::SeqCst), 1);
659 assert_eq!(stop_count.load(Ordering::SeqCst), 0);
660
661 ctx.stop().await.unwrap();
662
663 assert_eq!(stop_count.load(Ordering::SeqCst), 1);
664 }
665
666 #[tokio::test]
667 async fn test_service_start_failure_rollback() {
668 struct FailingService {
669 start_count: Arc<AtomicUsize>,
670 stop_count: Arc<AtomicUsize>,
671 should_fail: bool,
672 }
673
674 #[async_trait]
675 impl Lifecycle for FailingService {
676 fn name(&self) -> &str {
677 "failing"
678 }
679
680 async fn start(&mut self) -> Result<(), CamelError> {
681 self.start_count.fetch_add(1, Ordering::SeqCst);
682 if self.should_fail {
683 Err(CamelError::ProcessorError("intentional failure".into()))
684 } else {
685 Ok(())
686 }
687 }
688
689 async fn stop(&mut self) -> Result<(), CamelError> {
690 self.stop_count.fetch_add(1, Ordering::SeqCst);
691 Ok(())
692 }
693 }
694
695 let start1 = Arc::new(AtomicUsize::new(0));
696 let stop1 = Arc::new(AtomicUsize::new(0));
697 let start2 = Arc::new(AtomicUsize::new(0));
698 let stop2 = Arc::new(AtomicUsize::new(0));
699 let start3 = Arc::new(AtomicUsize::new(0));
700 let stop3 = Arc::new(AtomicUsize::new(0));
701
702 let service1 = FailingService {
703 start_count: start1.clone(),
704 stop_count: stop1.clone(),
705 should_fail: false,
706 };
707 let service2 = FailingService {
708 start_count: start2.clone(),
709 stop_count: stop2.clone(),
710 should_fail: true, };
712 let service3 = FailingService {
713 start_count: start3.clone(),
714 stop_count: stop3.clone(),
715 should_fail: false,
716 };
717
718 let mut ctx = CamelContext::new()
719 .with_lifecycle(service1)
720 .with_lifecycle(service2)
721 .with_lifecycle(service3);
722
723 let result = ctx.start().await;
725 assert!(result.is_err());
726
727 assert_eq!(start1.load(Ordering::SeqCst), 1);
729 assert_eq!(stop1.load(Ordering::SeqCst), 1);
730
731 assert_eq!(start2.load(Ordering::SeqCst), 1);
733 assert_eq!(stop2.load(Ordering::SeqCst), 0);
734
735 assert_eq!(start3.load(Ordering::SeqCst), 0);
737 assert_eq!(stop3.load(Ordering::SeqCst), 0);
738 }
739}