Skip to main content

camel_core/
context.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus};
9use camel_component::Component;
10
11use crate::registry::Registry;
12use crate::route::RouteDefinition;
13use crate::route_controller::DefaultRouteController;
14
15/// Counter for generating unique route IDs.
16static ROUTE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
17
18/// Generate a unique route ID.
19fn generate_route_id() -> String {
20    format!("route-{}", ROUTE_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
21}
22
23/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
24///
25/// # Lifecycle
26///
27/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
28/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
29/// stopped context is not supported — create a new instance instead.
30pub struct CamelContext {
31    registry: Arc<std::sync::Mutex<Registry>>,
32    route_controller: Arc<Mutex<DefaultRouteController>>,
33    cancel_token: CancellationToken,
34    metrics: Arc<dyn MetricsCollector>,
35}
36
37impl CamelContext {
38    /// Create a new, empty CamelContext.
39    pub fn new() -> Self {
40        Self::with_metrics(Arc::new(NoOpMetrics))
41    }
42
43    /// Create a new CamelContext with a custom metrics collector.
44    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
45        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
46        let controller = Arc::new(Mutex::new(DefaultRouteController::new(Arc::clone(
47            &registry,
48        ))));
49
50        // Set self-ref so DefaultRouteController can create ProducerContext
51        // Use try_lock since we just created it and nobody else has access yet
52        controller
53            .try_lock()
54            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
55            .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
56
57        Self {
58            registry,
59            route_controller: controller,
60            cancel_token: CancellationToken::new(),
61            metrics,
62        }
63    }
64
65    /// Set a global error handler applied to all routes without a per-route handler.
66    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
67        self.route_controller
68            .try_lock()
69            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
70            .set_error_handler(config);
71    }
72
73    /// Register a component with this context.
74    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
75        info!(scheme = component.scheme(), "Registering component");
76        self.registry
77            .lock()
78            .expect("mutex poisoned: another thread panicked while holding this lock")
79            .register(component);
80    }
81
82    /// Add a route definition to this context.
83    ///
84    /// The route must have an ID. If None, one will be generated automatically.
85    /// Steps are resolved immediately using registered components.
86    pub fn add_route_definition(
87        &mut self,
88        mut definition: RouteDefinition,
89    ) -> Result<(), CamelError> {
90        // Auto-generate route ID if not set
91        if definition.route_id().is_none() {
92            definition = definition.with_route_id(generate_route_id());
93        }
94
95        info!(from = definition.from_uri(), route_id = ?definition.route_id(), "Adding route definition");
96
97        self.route_controller
98            .try_lock()
99            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
100            .add_route(definition)
101    }
102
103    /// Access the component registry.
104    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
105        self.registry
106            .lock()
107            .expect("mutex poisoned: another thread panicked while holding this lock")
108    }
109
110    /// Access the route controller.
111    pub fn route_controller(&self) -> &Arc<Mutex<DefaultRouteController>> {
112        &self.route_controller
113    }
114
115    /// Get the metrics collector.
116    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
117        Arc::clone(&self.metrics)
118    }
119
120    /// Get the status of a route by ID.
121    pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
122        self.route_controller
123            .try_lock()
124            .ok()?
125            .route_status(route_id)
126    }
127
128    /// Start all routes. Each route's consumer will begin producing exchanges.
129    ///
130    /// Only routes with `auto_startup == true` will be started, in order of their
131    /// `startup_order` (lower values start first).
132    pub async fn start(&mut self) -> Result<(), CamelError> {
133        info!("Starting CamelContext");
134        self.route_controller
135            .lock()
136            .await
137            .start_all_routes()
138            .await?;
139        info!("CamelContext started");
140        Ok(())
141    }
142
143    /// Graceful shutdown with default 30-second timeout.
144    pub async fn stop(&mut self) -> Result<(), CamelError> {
145        self.stop_timeout(std::time::Duration::from_secs(30)).await
146    }
147
148    /// Graceful shutdown with custom timeout.
149    ///
150    /// Note: The timeout parameter is currently not used directly; the RouteController
151    /// manages its own shutdown timeout. This may change in a future version.
152    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
153        info!("Stopping CamelContext");
154
155        // Signal cancellation (for any legacy code that might use it)
156        self.cancel_token.cancel();
157
158        // Stop all routes via the controller
159        self.route_controller.lock().await.stop_all_routes().await?;
160
161        info!("CamelContext stopped");
162        Ok(())
163    }
164
165    /// Immediate abort — kills all tasks without draining.
166    pub async fn abort(&mut self) {
167        self.cancel_token.cancel();
168        let _ = self.route_controller.lock().await.stop_all_routes().await;
169    }
170}
171
172impl Default for CamelContext {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use camel_api::CamelError;
182    use camel_component::Endpoint;
183
184    /// Mock component for testing
185    struct MockComponent;
186
187    impl Component for MockComponent {
188        fn scheme(&self) -> &str {
189            "mock"
190        }
191
192        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
193            Err(CamelError::ComponentNotFound("mock".to_string()))
194        }
195    }
196
197    #[test]
198    fn test_context_handles_mutex_poisoning_gracefully() {
199        let mut ctx = CamelContext::new();
200
201        // Register a component successfully
202        ctx.register_component(MockComponent);
203
204        // Access registry should work even after potential panic in another thread
205        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
206            let _guard = ctx.registry();
207        }));
208
209        assert!(
210            result.is_ok(),
211            "Registry access should handle mutex poisoning"
212        );
213    }
214}