1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use tokio::sync::Mutex;
4use tokio_util::sync::CancellationToken;
5use tracing::info;
6
7use camel_api::error_handler::ErrorHandlerConfig;
8use camel_api::{CamelError, MetricsCollector, NoOpMetrics, RouteController, RouteStatus};
9use camel_component::Component;
10
11use crate::registry::Registry;
12use crate::route::RouteDefinition;
13use crate::route_controller::DefaultRouteController;
14
15static ROUTE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
17
18fn generate_route_id() -> String {
20 format!("route-{}", ROUTE_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
21}
22
23pub struct CamelContext {
31 registry: Arc<std::sync::Mutex<Registry>>,
32 route_controller: Arc<Mutex<DefaultRouteController>>,
33 cancel_token: CancellationToken,
34 metrics: Arc<dyn MetricsCollector>,
35}
36
37impl CamelContext {
38 pub fn new() -> Self {
40 Self::with_metrics(Arc::new(NoOpMetrics))
41 }
42
43 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
45 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
46 let controller = Arc::new(Mutex::new(DefaultRouteController::new(Arc::clone(
47 ®istry,
48 ))));
49
50 controller
53 .try_lock()
54 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
55 .set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
56
57 Self {
58 registry,
59 route_controller: controller,
60 cancel_token: CancellationToken::new(),
61 metrics,
62 }
63 }
64
65 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
67 self.route_controller
68 .try_lock()
69 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
70 .set_error_handler(config);
71 }
72
73 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
75 info!(scheme = component.scheme(), "Registering component");
76 self.registry
77 .lock()
78 .expect("mutex poisoned: another thread panicked while holding this lock")
79 .register(component);
80 }
81
82 pub fn add_route_definition(
87 &mut self,
88 mut definition: RouteDefinition,
89 ) -> Result<(), CamelError> {
90 if definition.route_id().is_none() {
92 definition = definition.with_route_id(generate_route_id());
93 }
94
95 info!(from = definition.from_uri(), route_id = ?definition.route_id(), "Adding route definition");
96
97 self.route_controller
98 .try_lock()
99 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
100 .add_route(definition)
101 }
102
103 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
105 self.registry
106 .lock()
107 .expect("mutex poisoned: another thread panicked while holding this lock")
108 }
109
110 pub fn route_controller(&self) -> &Arc<Mutex<DefaultRouteController>> {
112 &self.route_controller
113 }
114
115 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
117 Arc::clone(&self.metrics)
118 }
119
120 pub fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
122 self.route_controller
123 .try_lock()
124 .ok()?
125 .route_status(route_id)
126 }
127
128 pub async fn start(&mut self) -> Result<(), CamelError> {
133 info!("Starting CamelContext");
134 self.route_controller
135 .lock()
136 .await
137 .start_all_routes()
138 .await?;
139 info!("CamelContext started");
140 Ok(())
141 }
142
143 pub async fn stop(&mut self) -> Result<(), CamelError> {
145 self.stop_timeout(std::time::Duration::from_secs(30)).await
146 }
147
148 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
153 info!("Stopping CamelContext");
154
155 self.cancel_token.cancel();
157
158 self.route_controller.lock().await.stop_all_routes().await?;
160
161 info!("CamelContext stopped");
162 Ok(())
163 }
164
165 pub async fn abort(&mut self) {
167 self.cancel_token.cancel();
168 let _ = self.route_controller.lock().await.stop_all_routes().await;
169 }
170}
171
172impl Default for CamelContext {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use camel_api::CamelError;
182 use camel_component::Endpoint;
183
184 struct MockComponent;
186
187 impl Component for MockComponent {
188 fn scheme(&self) -> &str {
189 "mock"
190 }
191
192 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
193 Err(CamelError::ComponentNotFound("mock".to_string()))
194 }
195 }
196
197 #[test]
198 fn test_context_handles_mutex_poisoning_gracefully() {
199 let mut ctx = CamelContext::new();
200
201 ctx.register_component(MockComponent);
203
204 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
206 let _guard = ctx.registry();
207 }));
208
209 assert!(
210 result.is_ok(),
211 "Registry access should handle mutex poisoning"
212 );
213 }
214}