1use camel_api::datasource::DatasourceCatalog;
12use camel_bean::BeanProcessor;
13use camel_core::datasource::RuntimeDatasourceCatalog;
14use camel_language_js::JsLanguage;
15use camel_language_jsonpath::JsonPathLanguage;
16use camel_language_rhai::RhaiLanguage;
17use camel_language_xpath::XPathLanguage;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio_util::sync::CancellationToken;
21
22struct BridgeCleanup {
23 xslt: Arc<camel_xslt::XsltBridgeRuntime>,
24 xj: Arc<camel_xj::XjBridgeRuntime>,
25 validator: Option<Arc<camel_component_validator::xsd_bridge::XsdBridgeBackend>>,
26}
27
28#[async_trait::async_trait]
29impl camel_api::lifecycle::Lifecycle for BridgeCleanup {
30 fn name(&self) -> &str {
31 "bridge-cleanup"
32 }
33
34 async fn start(&mut self) -> Result<(), camel_api::CamelError> {
35 Ok(())
36 }
37
38 async fn stop(&mut self) -> Result<(), camel_api::CamelError> {
39 self.xslt.shutdown().await;
40 self.xj.shutdown().await;
41 if let Some(validator) = &self.validator {
42 validator.shutdown().await;
43 }
44 Ok(())
45 }
46}
47
48pub async fn run(
49 routes_override: Option<String>,
50 config_path: String,
51 cli_watch: Option<bool>,
52 otel: bool,
53 otel_endpoint: Option<String>,
54 service_name: Option<String>,
55 health_port: Option<u16>,
56) -> Result<(), camel_api::CamelError> {
57 let mut camel_config: camel_config::config::CamelConfig =
59 camel_config::config::CamelConfig::from_file(&config_path).unwrap_or_else(|_| {
60 config::Config::builder()
62 .build()
63 .and_then(|c| c.try_deserialize())
64 .unwrap_or_else(|e| {
65 eprintln!("Failed to build default config: {e}");
66 std::process::exit(1);
67 })
68 });
69
70 let otel_enabled = otel || otel_endpoint.is_some() || service_name.is_some();
72 if otel_enabled {
73 let otel_cfg =
74 camel_config
75 .observability
76 .otel
77 .get_or_insert(camel_config::OtelCamelConfig {
78 enabled: true,
79 endpoint: "http://localhost:4317".to_string(),
80 service_name: "rust-camel".to_string(),
81 ..Default::default()
82 });
83 otel_cfg.enabled = true;
84 if let Some(ep) = otel_endpoint {
85 otel_cfg.endpoint = ep;
86 }
87 if let Some(name) = service_name {
88 otel_cfg.service_name = name;
89 }
90 }
91
92 if let Some(port) = health_port {
93 let health_cfg = camel_config
94 .observability
95 .health
96 .get_or_insert(camel_config::config::HealthCamelConfig::default());
97 health_cfg.enabled = true;
98 health_cfg.port = port;
99 }
100
101 let beans_registry = {
103 let bean_reg = std::sync::Arc::new(std::sync::Mutex::new(camel_bean::BeanRegistry::new()));
104 if camel_config.beans.is_empty() {
105 None
106 } else {
107 Some(bean_reg)
108 }
109 };
110
111 let mut ctx = camel_config::config::CamelConfig::configure_context_with_beans(
112 &camel_config,
113 beans_registry.clone(),
114 )
115 .await
116 .unwrap_or_else(|e| {
117 eprintln!("Failed to configure CamelContext: {e}");
118 std::process::exit(1);
119 });
120
121 match camel_function::FunctionRuntimeService::with_default_container_provider(
122 camel_function::FunctionConfig::default(),
123 ) {
124 Ok(svc) => ctx = ctx.with_lifecycle(svc),
125 Err(e) => tracing::warn!("Function runtime disabled: {e}"),
126 }
127
128 let datasource_catalog: Arc<dyn DatasourceCatalog> = {
130 let catalog = RuntimeDatasourceCatalog::new(camel_config.datasources.clone())
131 .with_health_registry(ctx.health_registry());
132 Arc::new(catalog)
133 };
134
135 #[cfg(feature = "wasm")]
137 if let Some(ref bean_reg) = beans_registry {
138 let component_registry = ctx.registry_arc();
139 let plugins_dir_raw = camel_config
140 .components
141 .raw
142 .get("wasm")
143 .and_then(|v| v.get("plugins_dir"))
144 .and_then(|v| v.as_str())
145 .unwrap_or("plugins");
146 let config_dir = std::path::Path::new(&config_path)
147 .parent()
148 .map(|p| {
149 if p.as_os_str().is_empty() {
150 std::path::Path::new(".")
151 } else {
152 p
153 }
154 })
155 .unwrap_or(std::path::Path::new("."));
156 let camel_root = config_dir.canonicalize().unwrap_or_else(|e| {
157 eprintln!("Error: cannot resolve project root: {e}");
158 std::process::exit(1);
159 });
160 crate::commands::plugin::validate_plugins_dir(&camel_root, plugins_dir_raw).unwrap_or_else(
161 |e| {
162 eprintln!("Error: invalid plugins_dir: {e}");
163 std::process::exit(1);
164 },
165 );
166 let plugins_dir = camel_root.join(plugins_dir_raw);
167 for (bean_name, bean_cfg) in &camel_config.beans {
168 tracing::info!(bean = %bean_name, plugin = %bean_cfg.plugin, "registering WASM bean");
169
170 if !bean_cfg
171 .plugin
172 .chars()
173 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
174 {
175 eprintln!(
176 "Invalid bean plugin name '{}': must be alphanumeric with - or _",
177 bean_cfg.plugin
178 );
179 std::process::exit(1);
180 }
181
182 let wasm_path = plugins_dir.join(format!("{}.wasm", bean_cfg.plugin));
183 let canonical_plugins = plugins_dir.canonicalize().unwrap_or_else(|_| {
184 eprintln!("Plugins directory not found: {}", plugins_dir.display());
185 std::process::exit(1);
186 });
187 let canonical_path = wasm_path.canonicalize().unwrap_or_else(|_| {
188 eprintln!("WASM bean plugin not found: {}", wasm_path.display());
189 std::process::exit(1);
190 });
191 if !canonical_path.starts_with(&canonical_plugins) {
192 eprintln!(
193 "Bean plugin path escapes plugins directory: {}",
194 bean_cfg.plugin
195 );
196 std::process::exit(1);
197 }
198 let wasm_config =
199 camel_component_wasm::config::WasmConfig::from_limits(&bean_cfg.limits);
200 let wasm_bean = camel_component_wasm::bean::WasmBean::new(
201 &wasm_path,
202 wasm_config,
203 component_registry.clone(),
204 bean_cfg.config.clone(),
205 )
206 .await
207 .unwrap_or_else(|e| {
208 eprintln!("Failed to load WASM bean '{}': {}", bean_name, e);
209 std::process::exit(1);
210 });
211 tracing::info!(
212 bean = %bean_name,
213 plugin = %bean_cfg.plugin,
214 methods = ?wasm_bean.methods(),
215 "WASM bean loaded"
216 );
217 bean_reg
218 .lock()
219 .expect("beans registry lock") .register(bean_name, wasm_bean)
221 .unwrap_or_else(|e| {
222 eprintln!("Bean registration failed for '{}': {}", bean_name, e);
223 std::process::exit(1);
224 });
225 }
226 }
227
228 let patterns: Vec<String> = if let Some(p) = routes_override {
230 vec![p]
231 } else if !camel_config.routes.is_empty() {
232 camel_config.routes.clone()
233 } else {
234 vec!["routes/*.yaml".to_string()]
235 };
236
237 tracing::info!("camel-cli: loading routes from patterns: {:?}", patterns);
238
239 let security_compile_context =
240 crate::build_security_compile_context_from_config(&camel_config, ctx.registry_arc())
241 .await?;
242
243 macro_rules! register_bundle {
247 ($ctx:expr, $cfg:expr, $Bundle:ty) => {
248 let raw = $cfg
249 .components
250 .raw
251 .get(<$Bundle as camel_component_api::ComponentBundle>::config_key())
252 .cloned()
253 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
254 match <$Bundle as camel_component_api::ComponentBundle>::from_toml(raw) {
255 Ok(bundle) => <$Bundle as camel_component_api::ComponentBundle>::register_all(
256 bundle, &mut $ctx,
257 ),
258 Err(e) => {
259 return Err(camel_api::CamelError::Config(format!(
260 "Failed to load {} config: {}",
261 <$Bundle as camel_component_api::ComponentBundle>::config_key(),
262 e
263 )));
264 }
265 }
266 };
267 }
268
269 ctx.register_component(camel_component_timer::TimerComponent::new());
271 ctx.register_component(camel_component_log::LogComponent::new());
272 ctx.register_component(camel_component_direct::DirectComponent::new());
273 ctx.register_component(camel_component_seda::SedaComponent::new());
274 ctx.register_component(camel_component_mock::MockComponent::new());
275 ctx.register_component(camel_component_controlbus::ControlBusComponent::new());
276 let validator_component = camel_component_validator::ValidatorComponent::new();
277 let validator_backend = validator_component.xsd_bridge_backend();
278 ctx.register_component(validator_component);
279
280 let xslt_component = camel_xslt::XsltComponent::default();
281 let xslt_runtime = xslt_component.bridge_runtime();
282 ctx.register_component(xslt_component);
283
284 let xj_component = camel_xj::XjComponent::default();
285 let xj_runtime = xj_component.bridge_runtime();
286 ctx.register_component(xj_component);
287
288 ctx = ctx.with_lifecycle(BridgeCleanup {
289 xslt: xslt_runtime,
290 xj: xj_runtime,
291 validator: validator_backend,
292 });
293
294 register_bundle!(ctx, camel_config, camel_component_http::HttpBundle);
296 #[cfg(feature = "http-static")]
297 register_bundle!(ctx, camel_config, camel_component_http::HttpStaticBundle);
298 register_bundle!(ctx, camel_config, camel_component_ws::WsBundle);
299 register_bundle!(ctx, camel_config, camel_component_file::FileBundle);
300 register_bundle!(
301 ctx,
302 camel_config,
303 camel_component_container::ContainerBundle
304 );
305
306 let jms_pool = {
308 let raw = camel_config
309 .components
310 .raw
311 .get("jms")
312 .cloned()
313 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
314 match <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::from_toml(
315 raw,
316 ) {
317 Ok(bundle) => {
318 let pool = bundle.pool();
319 <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
320 pool
321 }
322 Err(e) => {
323 return Err(camel_api::CamelError::Config(format!(
324 "Failed to load jms config: {e}"
325 )));
326 }
327 }
328 };
329
330 let cxf_pool = {
331 let raw = camel_config
332 .components
333 .raw
334 .get("cxf")
335 .cloned()
336 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
337 match <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::from_toml(
338 raw,
339 ) {
340 Ok(bundle) => {
341 let pool = bundle.pool();
342 <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
343 pool
344 }
345 Err(e) => {
346 return Err(camel_api::CamelError::Config(format!(
347 "Failed to load cxf config: {e}"
348 )));
349 }
350 }
351 };
352
353 #[cfg(feature = "kafka")]
354 register_bundle!(ctx, camel_config, camel_component_kafka::KafkaBundle);
355 register_bundle!(ctx, camel_config, camel_master::MasterBundle);
356 register_bundle!(
357 ctx,
358 camel_config,
359 camel_component_opensearch::OpenSearchBundle
360 );
361 register_bundle!(ctx, camel_config, camel_component_redis::RedisBundle);
362 {
363 let sql_raw = camel_config
364 .components
365 .raw
366 .get(<camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::config_key())
367 .cloned()
368 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
369 match <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::from_toml(
370 sql_raw,
371 ) {
372 Ok(bundle) => {
373 let bundle = bundle.with_catalog(Arc::clone(&datasource_catalog));
374 <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
375 }
376 Err(e) => {
377 tracing::error!("failed to initialize SQL bundle: {}", e);
379 }
380 }
381 }
382 #[cfg(feature = "grpc")]
383 register_bundle!(ctx, camel_config, camel_component_grpc::GrpcBundle);
384
385 #[cfg(feature = "wasm")]
386 {
387 let base_dir = std::path::Path::new(&config_path)
388 .parent()
389 .unwrap_or(std::path::Path::new("."))
390 .to_path_buf();
391 let wasm_bundle = camel_component_wasm::WasmBundle::new(ctx.registry_arc(), base_dir);
392 <camel_component_wasm::WasmBundle as camel_component_api::ComponentBundle>::register_all(
393 wasm_bundle,
394 &mut ctx,
395 );
396 }
397
398 ctx.register_language("js", Box::new(JsLanguage::new()))?;
401 ctx.register_language("javascript", Box::new(JsLanguage::new()))?;
402 ctx.register_language("rhai", Box::new(RhaiLanguage::new()))?;
403 ctx.register_language("jsonpath", Box::new(JsonPathLanguage::new()))?;
404 ctx.register_language("xpath", Box::new(XPathLanguage::new()))?;
405
406 match camel_dsl::discover_routes_with_threshold_and_security(
408 &patterns,
409 camel_config.stream_caching.threshold,
410 security_compile_context.clone(),
411 ) {
412 Ok(defs) => {
413 for def in defs {
414 let id = def.route_id().to_string();
415 if let Err(e) = ctx.add_route_definition(def).await {
416 tracing::error!("Failed to add route '{}': {}", id, e);
418 }
419 }
420 }
421 Err(e) => {
422 tracing::error!("Failed to discover routes: {}", e);
424 std::process::exit(1);
425 }
426 }
427
428 if let Err(e) = ctx.start().await {
430 tracing::error!("Failed to start CamelContext: {}", e);
432 std::process::exit(1);
433 }
434
435 tracing::info!("camel-cli: context started");
436
437 let watch_enabled = cli_watch.unwrap_or(camel_config.watch);
440
441 let watcher_shutdown = CancellationToken::new();
443 if watch_enabled {
444 let ctrl = ctx.runtime_execution_handle();
445 let watch_patterns = patterns.clone();
446 let watch_security_compile_context = security_compile_context.clone();
447 let drain_timeout = std::time::Duration::from_millis(camel_config.drain_timeout_ms);
448 let debounce = std::time::Duration::from_millis(camel_config.watch_debounce_ms);
449 let watcher_token = watcher_shutdown.clone();
450 tokio::spawn(async move {
451 let watch_dirs = camel_core::reload_watcher::resolve_watch_dirs(&watch_patterns);
452 let result = camel_core::reload_watcher::watch_and_reload(
453 watch_dirs,
454 ctrl,
455 move || {
456 camel_dsl::discover_routes_with_threshold_and_security(
457 &watch_patterns,
458 camel_config.stream_caching.threshold,
459 watch_security_compile_context.clone(),
460 )
461 .map_err(|e| camel_api::CamelError::RouteError(e.to_string()))
462 },
463 Some(watcher_token),
464 drain_timeout,
465 debounce,
466 )
467 .await;
468 if let Err(e) = result {
469 tracing::error!("File watcher failed: {}", e);
471 }
472 });
473 tracing::info!(
474 "camel-cli: hot-reload watching {:?}. Press Ctrl+C to stop.",
475 patterns
476 );
477 } else {
478 tracing::info!("camel-cli: running (hot-reload disabled). Press Ctrl+C to stop.");
479 }
480
481 tokio::select! {
482 _ = tokio::signal::ctrl_c() => tracing::info!("Received Ctrl+C"),
483 _ = async {
484 #[cfg(unix)]
485 {
486 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
487 .expect("Failed to install SIGTERM handler") .recv()
489 .await
490 }
491 #[cfg(not(unix))]
492 {
493 std::future::pending::<()>().await
494 }
495 } => tracing::info!("Received SIGTERM"),
496 }
497
498 let force_exit = tokio::spawn(async {
500 tokio::signal::ctrl_c().await.ok();
501 tracing::warn!("Second Ctrl+C — forcing exit");
502 std::process::exit(1);
503 });
504
505 tracing::info!("camel-cli: shutting down...");
506 watcher_shutdown.cancel();
507
508 jms_pool.begin_shutdown();
510 cxf_pool.begin_shutdown();
511
512 ctx.stop().await.unwrap_or_else(|e| {
514 tracing::error!("Error during shutdown: {}", e);
516 });
517
518 const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
520
521 match tokio::time::timeout(SHUTDOWN_TIMEOUT, jms_pool.shutdown()).await {
522 Ok(Ok(())) => {}
523 Ok(Err(e)) => {
524 tracing::error!("JMS pool shutdown failed: {}", e);
526 }
527 Err(_) => tracing::warn!("JMS pool shutdown timed out after 30s"),
528 }
529
530 match tokio::time::timeout(SHUTDOWN_TIMEOUT, cxf_pool.shutdown()).await {
531 Ok(Ok(())) => {}
532 Ok(Err(e)) => {
533 tracing::error!("CXF pool shutdown failed: {}", e);
535 }
536 Err(_) => tracing::warn!("CXF pool shutdown timed out after 30s"),
537 }
538
539 force_exit.abort();
540
541 tracing::info!("camel-cli: stopped");
542 Ok(())
543}