Skip to main content

camel_core/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::{info, warn};
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{
9    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
10    RouteController, RouteStatus, ServiceHealth, ServiceStatus, SupervisionConfig,
11};
12use camel_component::Component;
13use camel_language_api::Language;
14use camel_language_api::LanguageError;
15
16use crate::config::TracerConfig;
17use crate::registry::Registry;
18use crate::route::RouteDefinition;
19use crate::route_controller::{
20    DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
21};
22use crate::supervising_route_controller::SupervisingRouteController;
23
24/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
25///
26/// # Lifecycle
27///
28/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
29/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
30/// stopped context is not supported — create a new instance instead.
31pub struct CamelContext {
32    registry: Arc<std::sync::Mutex<Registry>>,
33    route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
34    cancel_token: CancellationToken,
35    metrics: Arc<dyn MetricsCollector>,
36    languages: SharedLanguageRegistry,
37    shutdown_timeout: std::time::Duration,
38    services: Vec<Box<dyn Lifecycle>>,
39}
40
41impl CamelContext {
42    fn built_in_languages() -> SharedLanguageRegistry {
43        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
44        languages.insert(
45            "simple".to_string(),
46            Arc::new(camel_language_simple::SimpleLanguage),
47        );
48        Arc::new(std::sync::Mutex::new(languages))
49    }
50
51    /// Create a new, empty CamelContext.
52    pub fn new() -> Self {
53        Self::with_metrics(Arc::new(NoOpMetrics))
54    }
55
56    /// Create a new CamelContext with a custom metrics collector.
57    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
58        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
59        let languages = Self::built_in_languages();
60        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
61            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
62        ));
63
64        // Set self-ref so DefaultRouteController can create ProducerContext
65        // Use try_lock since we just created it and nobody else has access yet
66        controller
67            .try_lock()
68            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
69            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
70
71        Self {
72            registry,
73            route_controller: controller,
74            cancel_token: CancellationToken::new(),
75            metrics,
76            languages,
77            shutdown_timeout: std::time::Duration::from_secs(30),
78            services: Vec::new(),
79        }
80    }
81
82    /// Create a new CamelContext with route supervision enabled.
83    ///
84    /// The supervision config controls automatic restart behavior for crashed routes.
85    pub fn with_supervision(config: SupervisionConfig) -> Self {
86        Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
87    }
88
89    /// Create a new CamelContext with route supervision and custom metrics.
90    ///
91    /// The supervision config controls automatic restart behavior for crashed routes.
92    pub fn with_supervision_and_metrics(
93        config: SupervisionConfig,
94        metrics: Arc<dyn MetricsCollector>,
95    ) -> Self {
96        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
97        let languages = Self::built_in_languages();
98        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
99            SupervisingRouteController::with_languages(
100                Arc::clone(&registry),
101                config,
102                Arc::clone(&languages),
103            )
104            .with_metrics(Arc::clone(&metrics)),
105        ));
106
107        // Set self-ref so SupervisingRouteController can create ProducerContext
108        // Use try_lock since we just created it and nobody else has access yet
109        controller
110            .try_lock()
111            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
112            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
113
114        Self {
115            registry,
116            route_controller: controller,
117            cancel_token: CancellationToken::new(),
118            metrics,
119            languages,
120            shutdown_timeout: std::time::Duration::from_secs(30),
121            services: Vec::new(),
122        }
123    }
124
125    /// Set a global error handler applied to all routes without a per-route handler.
126    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
127        self.route_controller
128            .try_lock()
129            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
130            .set_error_handler(config);
131    }
132
133    /// Enable or disable tracing globally.
134    pub fn set_tracing(&mut self, enabled: bool) {
135        self.route_controller
136            .try_lock()
137            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
138            .set_tracer_config(&TracerConfig {
139                enabled,
140                ..Default::default()
141            });
142    }
143
144    /// Configure tracing with full config.
145    pub fn set_tracer_config(&mut self, config: TracerConfig) {
146        self.route_controller
147            .try_lock()
148            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
149            .set_tracer_config(&config);
150    }
151
152    /// Builder-style: enable tracing with default config.
153    pub fn with_tracing(mut self) -> Self {
154        self.set_tracing(true);
155        self
156    }
157
158    /// Builder-style: configure tracing with custom config.
159    /// Note: tracing subscriber initialization (stdout/file output) is handled
160    /// separately via init_tracing_subscriber (called in camel-config bridge).
161    pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
162        self.set_tracer_config(config);
163        self
164    }
165
166    /// Register a lifecycle service (Apache Camel: addService pattern)
167    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
168        // Auto-register MetricsCollector if available
169        if let Some(collector) = service.as_metrics_collector() {
170            self.metrics = collector;
171        }
172
173        self.services.push(Box::new(service));
174        self
175    }
176
177    /// Register a component with this context.
178    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
179        info!(scheme = component.scheme(), "Registering component");
180        self.registry
181            .lock()
182            .expect("mutex poisoned: another thread panicked while holding this lock")
183            .register(component);
184    }
185
186    /// Register a language with this context, keyed by name.
187    ///
188    /// Returns `Err(LanguageError::AlreadyRegistered)` if a language with the
189    /// same name is already registered. Use [`resolve_language`](Self::resolve_language)
190    /// to check before registering, or choose a distinct name.
191    pub fn register_language(
192        &mut self,
193        name: impl Into<String>,
194        lang: Box<dyn Language>,
195    ) -> Result<(), LanguageError> {
196        let name = name.into();
197        let mut languages = self
198            .languages
199            .lock()
200            .expect("mutex poisoned: another thread panicked while holding this lock");
201        if languages.contains_key(&name) {
202            return Err(LanguageError::AlreadyRegistered(name));
203        }
204        languages.insert(name, Arc::from(lang));
205        Ok(())
206    }
207
208    /// Resolve a language by name. Returns `None` if not registered.
209    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
210        let languages = self
211            .languages
212            .lock()
213            .expect("mutex poisoned: another thread panicked while holding this lock");
214        languages.get(name).cloned()
215    }
216
217    /// Add a route definition to this context.
218    ///
219    /// The route must have an ID. Steps are resolved immediately using registered components.
220    pub fn add_route_definition(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
221        info!(from = definition.from_uri(), route_id = %definition.route_id(), "Adding route definition");
222
223        self.route_controller
224            .try_lock()
225            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
226            .add_route(definition)
227    }
228
229    /// Access the component registry.
230    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
231        self.registry
232            .lock()
233            .expect("mutex poisoned: another thread panicked while holding this lock")
234    }
235
236    /// Access the route controller.
237    pub fn route_controller(&self) -> &Arc<Mutex<dyn RouteControllerInternal>> {
238        &self.route_controller
239    }
240
241    /// Get the metrics collector.
242    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
243        Arc::clone(&self.metrics)
244    }
245
246    /// Get the status of a route by ID.
247    pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
248        self.route_controller
249            .try_lock()
250            .ok()?
251            .route_status(route_id)
252    }
253
254    /// Start all routes. Each route's consumer will begin producing exchanges.
255    ///
256    /// Only routes with `auto_startup == true` will be started, in order of their
257    /// `startup_order` (lower values start first).
258    pub async fn start(&mut self) -> Result<(), CamelError> {
259        info!("Starting CamelContext");
260
261        // Start lifecycle services first
262        for (i, service) in self.services.iter_mut().enumerate() {
263            info!("Starting service: {}", service.name());
264            if let Err(e) = service.start().await {
265                // Rollback: stop already started services in reverse order
266                warn!(
267                    "Service {} failed to start, rolling back {} services",
268                    service.name(),
269                    i
270                );
271                for j in (0..i).rev() {
272                    if let Err(rollback_err) = self.services[j].stop().await {
273                        warn!(
274                            "Failed to stop service {} during rollback: {}",
275                            self.services[j].name(),
276                            rollback_err
277                        );
278                    }
279                }
280                return Err(e);
281            }
282        }
283
284        // Then start routes
285        self.route_controller
286            .lock()
287            .await
288            .start_all_routes()
289            .await?;
290
291        info!("CamelContext started");
292        Ok(())
293    }
294
295    /// Graceful shutdown with default 30-second timeout.
296    pub async fn stop(&mut self) -> Result<(), CamelError> {
297        self.stop_timeout(self.shutdown_timeout).await
298    }
299
300    /// Graceful shutdown with custom timeout.
301    ///
302    /// Note: The timeout parameter is currently not used directly; the RouteController
303    /// manages its own shutdown timeout. This may change in a future version.
304    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
305        info!("Stopping CamelContext");
306
307        // Signal cancellation (for any legacy code that might use it)
308        self.cancel_token.cancel();
309
310        // Stop all routes via the controller
311        self.route_controller.lock().await.stop_all_routes().await?;
312
313        // Then stop lifecycle services
314        // Continue stopping all services even if some fail
315        let mut first_error = None;
316        for service in &mut self.services {
317            info!("Stopping service: {}", service.name());
318            if let Err(e) = service.stop().await {
319                warn!("Service {} failed to stop: {}", service.name(), e);
320                if first_error.is_none() {
321                    first_error = Some(e);
322                }
323            }
324        }
325
326        info!("CamelContext stopped");
327
328        if let Some(e) = first_error {
329            Err(e)
330        } else {
331            Ok(())
332        }
333    }
334
335    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
336    pub fn shutdown_timeout(&self) -> std::time::Duration {
337        self.shutdown_timeout
338    }
339
340    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
341    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
342        self.shutdown_timeout = timeout;
343    }
344
345    /// Immediate abort — kills all tasks without draining.
346    pub async fn abort(&mut self) {
347        self.cancel_token.cancel();
348        let _ = self.route_controller.lock().await.stop_all_routes().await;
349    }
350
351    /// Check health status of all registered services.
352    pub fn health_check(&self) -> HealthReport {
353        let services: Vec<ServiceHealth> = self
354            .services
355            .iter()
356            .map(|s| ServiceHealth {
357                name: s.name().to_string(),
358                status: s.status(),
359            })
360            .collect();
361
362        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
363            HealthStatus::Healthy
364        } else {
365            HealthStatus::Unhealthy
366        };
367
368        HealthReport {
369            status,
370            services,
371            ..Default::default()
372        }
373    }
374}
375
376impl Default for CamelContext {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::route::{BuilderStep, LanguageExpressionDef, RouteDefinition};
386    use camel_api::CamelError;
387    use camel_component::Endpoint;
388
389    /// Mock component for testing
390    struct MockComponent;
391
392    impl Component for MockComponent {
393        fn scheme(&self) -> &str {
394            "mock"
395        }
396
397        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
398            Err(CamelError::ComponentNotFound("mock".to_string()))
399        }
400    }
401
402    #[test]
403    fn test_context_handles_mutex_poisoning_gracefully() {
404        let mut ctx = CamelContext::new();
405
406        // Register a component successfully
407        ctx.register_component(MockComponent);
408
409        // Access registry should work even after potential panic in another thread
410        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
411            let _guard = ctx.registry();
412        }));
413
414        assert!(
415            result.is_ok(),
416            "Registry access should handle mutex poisoning"
417        );
418    }
419
420    #[test]
421    fn test_context_resolves_simple_language() {
422        let ctx = CamelContext::new();
423        let lang = ctx
424            .resolve_language("simple")
425            .expect("simple language not found");
426        assert_eq!(lang.name(), "simple");
427    }
428
429    #[test]
430    fn test_simple_language_via_context() {
431        let ctx = CamelContext::new();
432        let lang = ctx.resolve_language("simple").unwrap();
433        let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
434        let mut msg = camel_api::message::Message::default();
435        msg.set_header("x", camel_api::Value::String("hello".into()));
436        let ex = camel_api::exchange::Exchange::new(msg);
437        assert!(pred.matches(&ex).unwrap());
438    }
439
440    #[test]
441    fn test_resolve_unknown_language_returns_none() {
442        let ctx = CamelContext::new();
443        assert!(ctx.resolve_language("nonexistent").is_none());
444    }
445
446    #[test]
447    fn test_register_language_duplicate_returns_error() {
448        use camel_language_api::LanguageError;
449        struct DummyLang;
450        impl camel_language_api::Language for DummyLang {
451            fn name(&self) -> &'static str {
452                "dummy"
453            }
454            fn create_expression(
455                &self,
456                _: &str,
457            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
458                Err(LanguageError::EvalError("not implemented".into()))
459            }
460            fn create_predicate(
461                &self,
462                _: &str,
463            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
464                Err(LanguageError::EvalError("not implemented".into()))
465            }
466        }
467
468        let mut ctx = CamelContext::new();
469        ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
470        let result = ctx.register_language("dummy", Box::new(DummyLang));
471        assert!(result.is_err(), "duplicate registration should fail");
472        let err_msg = result.unwrap_err().to_string();
473        assert!(
474            err_msg.contains("dummy"),
475            "error should mention the language name"
476        );
477    }
478
479    #[test]
480    fn test_register_language_new_key_succeeds() {
481        use camel_language_api::LanguageError;
482        struct DummyLang;
483        impl camel_language_api::Language for DummyLang {
484            fn name(&self) -> &'static str {
485                "dummy"
486            }
487            fn create_expression(
488                &self,
489                _: &str,
490            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
491                Err(LanguageError::EvalError("not implemented".into()))
492            }
493            fn create_predicate(
494                &self,
495                _: &str,
496            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
497                Err(LanguageError::EvalError("not implemented".into()))
498            }
499        }
500
501        let mut ctx = CamelContext::new();
502        let result = ctx.register_language("dummy", Box::new(DummyLang));
503        assert!(result.is_ok(), "first registration should succeed");
504    }
505
506    #[test]
507    fn test_add_route_definition_uses_runtime_registered_language() {
508        use camel_language_api::{Expression, LanguageError, Predicate};
509
510        struct DummyExpression;
511        impl Expression for DummyExpression {
512            fn evaluate(
513                &self,
514                _exchange: &camel_api::Exchange,
515            ) -> Result<camel_api::Value, LanguageError> {
516                Ok(camel_api::Value::String("ok".into()))
517            }
518        }
519
520        struct DummyPredicate;
521        impl Predicate for DummyPredicate {
522            fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
523                Ok(true)
524            }
525        }
526
527        struct RuntimeLang;
528        impl camel_language_api::Language for RuntimeLang {
529            fn name(&self) -> &'static str {
530                "runtime"
531            }
532
533            fn create_expression(
534                &self,
535                _script: &str,
536            ) -> Result<Box<dyn Expression>, LanguageError> {
537                Ok(Box::new(DummyExpression))
538            }
539
540            fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
541                Ok(Box::new(DummyPredicate))
542            }
543        }
544
545        let mut ctx = CamelContext::new();
546        ctx.register_language("runtime", Box::new(RuntimeLang))
547            .unwrap();
548
549        let definition = RouteDefinition::new(
550            "timer:tick",
551            vec![BuilderStep::DeclarativeScript {
552                expression: LanguageExpressionDef {
553                    language: "runtime".into(),
554                    source: "${body}".into(),
555                },
556            }],
557        )
558        .with_route_id("runtime-lang-route");
559
560        let result = ctx.add_route_definition(definition);
561        assert!(
562            result.is_ok(),
563            "route should resolve runtime language: {result:?}"
564        );
565    }
566
567    #[test]
568    fn test_add_route_definition_fails_for_unregistered_runtime_language() {
569        let mut ctx = CamelContext::new();
570        let definition = RouteDefinition::new(
571            "timer:tick",
572            vec![BuilderStep::DeclarativeSetBody {
573                value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
574                    language: "missing-lang".into(),
575                    source: "${body}".into(),
576                }),
577            }],
578        )
579        .with_route_id("missing-runtime-lang-route");
580
581        let result = ctx.add_route_definition(definition);
582        assert!(
583            result.is_err(),
584            "route should fail when language is missing"
585        );
586        let error_text = result.unwrap_err().to_string();
587        assert!(
588            error_text.contains("missing-lang"),
589            "error should mention missing language, got: {error_text}"
590        );
591    }
592
593    #[test]
594    fn test_health_check_empty_context() {
595        let ctx = CamelContext::new();
596        let report = ctx.health_check();
597
598        assert_eq!(report.status, HealthStatus::Healthy);
599        assert!(report.services.is_empty());
600    }
601}
602
603#[cfg(test)]
604mod lifecycle_tests {
605    use super::*;
606    use async_trait::async_trait;
607    use camel_api::Lifecycle;
608    use std::sync::Arc;
609    use std::sync::atomic::{AtomicUsize, Ordering};
610
611    struct MockService {
612        start_count: Arc<AtomicUsize>,
613        stop_count: Arc<AtomicUsize>,
614    }
615
616    impl MockService {
617        fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
618            let start_count = Arc::new(AtomicUsize::new(0));
619            let stop_count = Arc::new(AtomicUsize::new(0));
620            (
621                Self {
622                    start_count: start_count.clone(),
623                    stop_count: stop_count.clone(),
624                },
625                start_count,
626                stop_count,
627            )
628        }
629    }
630
631    #[async_trait]
632    impl Lifecycle for MockService {
633        fn name(&self) -> &str {
634            "mock"
635        }
636
637        async fn start(&mut self) -> Result<(), CamelError> {
638            self.start_count.fetch_add(1, Ordering::SeqCst);
639            Ok(())
640        }
641
642        async fn stop(&mut self) -> Result<(), CamelError> {
643            self.stop_count.fetch_add(1, Ordering::SeqCst);
644            Ok(())
645        }
646    }
647
648    #[tokio::test]
649    async fn test_context_starts_lifecycle_services() {
650        let (service, start_count, stop_count) = MockService::new();
651
652        let mut ctx = CamelContext::new().with_lifecycle(service);
653
654        assert_eq!(start_count.load(Ordering::SeqCst), 0);
655
656        ctx.start().await.unwrap();
657
658        assert_eq!(start_count.load(Ordering::SeqCst), 1);
659        assert_eq!(stop_count.load(Ordering::SeqCst), 0);
660
661        ctx.stop().await.unwrap();
662
663        assert_eq!(stop_count.load(Ordering::SeqCst), 1);
664    }
665
666    #[tokio::test]
667    async fn test_service_start_failure_rollback() {
668        struct FailingService {
669            start_count: Arc<AtomicUsize>,
670            stop_count: Arc<AtomicUsize>,
671            should_fail: bool,
672        }
673
674        #[async_trait]
675        impl Lifecycle for FailingService {
676            fn name(&self) -> &str {
677                "failing"
678            }
679
680            async fn start(&mut self) -> Result<(), CamelError> {
681                self.start_count.fetch_add(1, Ordering::SeqCst);
682                if self.should_fail {
683                    Err(CamelError::ProcessorError("intentional failure".into()))
684                } else {
685                    Ok(())
686                }
687            }
688
689            async fn stop(&mut self) -> Result<(), CamelError> {
690                self.stop_count.fetch_add(1, Ordering::SeqCst);
691                Ok(())
692            }
693        }
694
695        let start1 = Arc::new(AtomicUsize::new(0));
696        let stop1 = Arc::new(AtomicUsize::new(0));
697        let start2 = Arc::new(AtomicUsize::new(0));
698        let stop2 = Arc::new(AtomicUsize::new(0));
699        let start3 = Arc::new(AtomicUsize::new(0));
700        let stop3 = Arc::new(AtomicUsize::new(0));
701
702        let service1 = FailingService {
703            start_count: start1.clone(),
704            stop_count: stop1.clone(),
705            should_fail: false,
706        };
707        let service2 = FailingService {
708            start_count: start2.clone(),
709            stop_count: stop2.clone(),
710            should_fail: true, // This one will fail
711        };
712        let service3 = FailingService {
713            start_count: start3.clone(),
714            stop_count: stop3.clone(),
715            should_fail: false,
716        };
717
718        let mut ctx = CamelContext::new()
719            .with_lifecycle(service1)
720            .with_lifecycle(service2)
721            .with_lifecycle(service3);
722
723        // Attempt to start - should fail
724        let result = ctx.start().await;
725        assert!(result.is_err());
726
727        // Verify service1 was started and then stopped (rollback)
728        assert_eq!(start1.load(Ordering::SeqCst), 1);
729        assert_eq!(stop1.load(Ordering::SeqCst), 1);
730
731        // Verify service2 was attempted to start but failed
732        assert_eq!(start2.load(Ordering::SeqCst), 1);
733        assert_eq!(stop2.load(Ordering::SeqCst), 0);
734
735        // Verify service3 was never started
736        assert_eq!(start3.load(Ordering::SeqCst), 0);
737        assert_eq!(stop3.load(Ordering::SeqCst), 0);
738    }
739}