1use camel_bean::BeanProcessor;
12use camel_language_js::JsLanguage;
13use camel_language_jsonpath::JsonPathLanguage;
14use camel_language_rhai::RhaiLanguage;
15use camel_language_xpath::XPathLanguage;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio_util::sync::CancellationToken;
19
20struct BridgeCleanup {
21 xslt: Arc<camel_xslt::XsltBridgeRuntime>,
22 xj: Arc<camel_xj::XjBridgeRuntime>,
23 validator: Option<Arc<camel_component_validator::xsd_bridge::XsdBridgeBackend>>,
24}
25
26#[async_trait::async_trait]
27impl camel_api::lifecycle::Lifecycle for BridgeCleanup {
28 fn name(&self) -> &str {
29 "bridge-cleanup"
30 }
31
32 async fn start(&mut self) -> Result<(), camel_api::CamelError> {
33 Ok(())
34 }
35
36 async fn stop(&mut self) -> Result<(), camel_api::CamelError> {
37 self.xslt.shutdown().await;
38 self.xj.shutdown().await;
39 if let Some(validator) = &self.validator {
40 validator.shutdown().await;
41 }
42 Ok(())
43 }
44}
45
46pub async fn run(
47 routes_override: Option<String>,
48 config_path: String,
49 cli_watch: Option<bool>,
50 otel: bool,
51 otel_endpoint: Option<String>,
52 service_name: Option<String>,
53 health_port: Option<u16>,
54) -> Result<(), camel_api::CamelError> {
55 let mut camel_config: camel_config::config::CamelConfig =
57 camel_config::config::CamelConfig::from_file(&config_path).unwrap_or_else(|_| {
58 config::Config::builder()
60 .build()
61 .and_then(|c| c.try_deserialize())
62 .unwrap_or_else(|e| {
63 eprintln!("Failed to build default config: {e}");
64 std::process::exit(1);
65 })
66 });
67
68 let otel_enabled = otel || otel_endpoint.is_some() || service_name.is_some();
70 if otel_enabled {
71 let otel_cfg =
72 camel_config
73 .observability
74 .otel
75 .get_or_insert(camel_config::OtelCamelConfig {
76 enabled: true,
77 endpoint: "http://localhost:4317".to_string(),
78 service_name: "rust-camel".to_string(),
79 ..Default::default()
80 });
81 otel_cfg.enabled = true;
82 if let Some(ep) = otel_endpoint {
83 otel_cfg.endpoint = ep;
84 }
85 if let Some(name) = service_name {
86 otel_cfg.service_name = name;
87 }
88 }
89
90 if let Some(port) = health_port {
91 let health_cfg = camel_config
92 .observability
93 .health
94 .get_or_insert(camel_config::config::HealthCamelConfig::default());
95 health_cfg.enabled = true;
96 health_cfg.port = port;
97 }
98
99 let beans_registry = {
101 let bean_reg = std::sync::Arc::new(std::sync::Mutex::new(camel_bean::BeanRegistry::new()));
102 if camel_config.beans.is_empty() {
103 None
104 } else {
105 Some(bean_reg)
106 }
107 };
108
109 let mut ctx = camel_config::config::CamelConfig::configure_context_with_beans(
110 &camel_config,
111 beans_registry.clone(),
112 )
113 .await
114 .unwrap_or_else(|e| {
115 eprintln!("Failed to configure CamelContext: {e}");
116 std::process::exit(1);
117 });
118
119 match camel_function::FunctionRuntimeService::with_default_container_provider(
120 camel_function::FunctionConfig::default(),
121 ) {
122 Ok(svc) => ctx = ctx.with_lifecycle(svc),
123 Err(e) => tracing::warn!("Function runtime disabled: {e}"),
124 }
125
126 #[cfg(feature = "wasm")]
128 if let Some(ref bean_reg) = beans_registry {
129 let component_registry = ctx.registry_arc();
130 let plugins_dir_raw = camel_config
131 .components
132 .raw
133 .get("wasm")
134 .and_then(|v| v.get("plugins_dir"))
135 .and_then(|v| v.as_str())
136 .unwrap_or("plugins");
137 let config_dir = std::path::Path::new(&config_path)
138 .parent()
139 .map(|p| {
140 if p.as_os_str().is_empty() {
141 std::path::Path::new(".")
142 } else {
143 p
144 }
145 })
146 .unwrap_or(std::path::Path::new("."));
147 let camel_root = config_dir.canonicalize().unwrap_or_else(|e| {
148 eprintln!("Error: cannot resolve project root: {e}");
149 std::process::exit(1);
150 });
151 crate::commands::plugin::validate_plugins_dir(&camel_root, plugins_dir_raw).unwrap_or_else(
152 |e| {
153 eprintln!("Error: invalid plugins_dir: {e}");
154 std::process::exit(1);
155 },
156 );
157 let plugins_dir = camel_root.join(plugins_dir_raw);
158 for (bean_name, bean_cfg) in &camel_config.beans {
159 tracing::info!(bean = %bean_name, plugin = %bean_cfg.plugin, "registering WASM bean");
160
161 if !bean_cfg
162 .plugin
163 .chars()
164 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
165 {
166 eprintln!(
167 "Invalid bean plugin name '{}': must be alphanumeric with - or _",
168 bean_cfg.plugin
169 );
170 std::process::exit(1);
171 }
172
173 let wasm_path = plugins_dir.join(format!("{}.wasm", bean_cfg.plugin));
174 let canonical_plugins = plugins_dir.canonicalize().unwrap_or_else(|_| {
175 eprintln!("Plugins directory not found: {}", plugins_dir.display());
176 std::process::exit(1);
177 });
178 let canonical_path = wasm_path.canonicalize().unwrap_or_else(|_| {
179 eprintln!("WASM bean plugin not found: {}", wasm_path.display());
180 std::process::exit(1);
181 });
182 if !canonical_path.starts_with(&canonical_plugins) {
183 eprintln!(
184 "Bean plugin path escapes plugins directory: {}",
185 bean_cfg.plugin
186 );
187 std::process::exit(1);
188 }
189 let wasm_config =
190 camel_component_wasm::config::WasmConfig::from_limits(&bean_cfg.limits);
191 let wasm_bean = camel_component_wasm::bean::WasmBean::new(
192 &wasm_path,
193 wasm_config,
194 component_registry.clone(),
195 bean_cfg.config.clone(),
196 )
197 .await
198 .unwrap_or_else(|e| {
199 eprintln!("Failed to load WASM bean '{}': {}", bean_name, e);
200 std::process::exit(1);
201 });
202 tracing::info!(
203 bean = %bean_name,
204 plugin = %bean_cfg.plugin,
205 methods = ?wasm_bean.methods(),
206 "WASM bean loaded"
207 );
208 bean_reg
209 .lock()
210 .expect("beans registry lock") .register(bean_name, wasm_bean)
212 .unwrap_or_else(|e| {
213 eprintln!("Bean registration failed for '{}': {}", bean_name, e);
214 std::process::exit(1);
215 });
216 }
217 }
218
219 let patterns: Vec<String> = if let Some(p) = routes_override {
221 vec![p]
222 } else if !camel_config.routes.is_empty() {
223 camel_config.routes.clone()
224 } else {
225 vec!["routes/*.yaml".to_string()]
226 };
227
228 tracing::info!("camel-cli: loading routes from patterns: {:?}", patterns);
229
230 let security_compile_context =
231 crate::build_security_compile_context_from_config(&camel_config, ctx.registry_arc())
232 .await?;
233
234 macro_rules! register_bundle {
238 ($ctx:expr, $cfg:expr, $Bundle:ty) => {
239 let raw = $cfg
240 .components
241 .raw
242 .get(<$Bundle as camel_component_api::ComponentBundle>::config_key())
243 .cloned()
244 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
245 match <$Bundle as camel_component_api::ComponentBundle>::from_toml(raw) {
246 Ok(bundle) => <$Bundle as camel_component_api::ComponentBundle>::register_all(
247 bundle, &mut $ctx,
248 ),
249 Err(e) => {
250 return Err(camel_api::CamelError::Config(format!(
251 "Failed to load {} config: {}",
252 <$Bundle as camel_component_api::ComponentBundle>::config_key(),
253 e
254 )));
255 }
256 }
257 };
258 }
259
260 ctx.register_component(camel_component_timer::TimerComponent::new());
262 ctx.register_component(camel_component_log::LogComponent::new());
263 ctx.register_component(camel_component_direct::DirectComponent::new());
264 ctx.register_component(camel_component_seda::SedaComponent::new());
265 ctx.register_component(camel_component_mock::MockComponent::new());
266 ctx.register_component(camel_component_controlbus::ControlBusComponent::new());
267 let validator_component = camel_component_validator::ValidatorComponent::new();
268 let validator_backend = validator_component.xsd_bridge_backend();
269 ctx.register_component(validator_component);
270
271 let xslt_component = camel_xslt::XsltComponent::default();
272 let xslt_runtime = xslt_component.bridge_runtime();
273 ctx.register_component(xslt_component);
274
275 let xj_component = camel_xj::XjComponent::default();
276 let xj_runtime = xj_component.bridge_runtime();
277 ctx.register_component(xj_component);
278
279 ctx = ctx.with_lifecycle(BridgeCleanup {
280 xslt: xslt_runtime,
281 xj: xj_runtime,
282 validator: validator_backend,
283 });
284
285 register_bundle!(ctx, camel_config, camel_component_http::HttpBundle);
287 #[cfg(feature = "http-static")]
288 register_bundle!(ctx, camel_config, camel_component_http::HttpStaticBundle);
289 register_bundle!(ctx, camel_config, camel_component_ws::WsBundle);
290 register_bundle!(ctx, camel_config, camel_component_file::FileBundle);
291 register_bundle!(
292 ctx,
293 camel_config,
294 camel_component_container::ContainerBundle
295 );
296
297 let jms_pool = {
299 let raw = camel_config
300 .components
301 .raw
302 .get("jms")
303 .cloned()
304 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
305 match <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::from_toml(
306 raw,
307 ) {
308 Ok(bundle) => {
309 let pool = bundle.pool();
310 <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
311 pool
312 }
313 Err(e) => {
314 return Err(camel_api::CamelError::Config(format!(
315 "Failed to load jms config: {e}"
316 )));
317 }
318 }
319 };
320
321 let cxf_pool = {
322 let raw = camel_config
323 .components
324 .raw
325 .get("cxf")
326 .cloned()
327 .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
328 match <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::from_toml(
329 raw,
330 ) {
331 Ok(bundle) => {
332 let pool = bundle.pool();
333 <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
334 pool
335 }
336 Err(e) => {
337 return Err(camel_api::CamelError::Config(format!(
338 "Failed to load cxf config: {e}"
339 )));
340 }
341 }
342 };
343
344 #[cfg(feature = "kafka")]
345 register_bundle!(ctx, camel_config, camel_component_kafka::KafkaBundle);
346 register_bundle!(ctx, camel_config, camel_master::MasterBundle);
347 register_bundle!(
348 ctx,
349 camel_config,
350 camel_component_opensearch::OpenSearchBundle
351 );
352 register_bundle!(ctx, camel_config, camel_component_redis::RedisBundle);
353 register_bundle!(ctx, camel_config, camel_component_sql::SqlBundle);
354 #[cfg(feature = "grpc")]
355 register_bundle!(ctx, camel_config, camel_component_grpc::GrpcBundle);
356
357 #[cfg(feature = "wasm")]
358 {
359 let base_dir = std::path::Path::new(&config_path)
360 .parent()
361 .unwrap_or(std::path::Path::new("."))
362 .to_path_buf();
363 let wasm_bundle = camel_component_wasm::WasmBundle::new(ctx.registry_arc(), base_dir);
364 <camel_component_wasm::WasmBundle as camel_component_api::ComponentBundle>::register_all(
365 wasm_bundle,
366 &mut ctx,
367 );
368 }
369
370 ctx.register_language("js", Box::new(JsLanguage::new()))?;
373 ctx.register_language("javascript", Box::new(JsLanguage::new()))?;
374 ctx.register_language("rhai", Box::new(RhaiLanguage::new()))?;
375 ctx.register_language("jsonpath", Box::new(JsonPathLanguage::new()))?;
376 ctx.register_language("xpath", Box::new(XPathLanguage::new()))?;
377
378 match camel_dsl::discover_routes_with_threshold_and_security(
380 &patterns,
381 camel_config.stream_caching.threshold,
382 security_compile_context.clone(),
383 ) {
384 Ok(defs) => {
385 for def in defs {
386 let id = def.route_id().to_string();
387 if let Err(e) = ctx.add_route_definition(def).await {
388 tracing::error!("Failed to add route '{}': {}", id, e);
390 }
391 }
392 }
393 Err(e) => {
394 tracing::error!("Failed to discover routes: {}", e);
396 std::process::exit(1);
397 }
398 }
399
400 if let Err(e) = ctx.start().await {
402 tracing::error!("Failed to start CamelContext: {}", e);
404 std::process::exit(1);
405 }
406
407 tracing::info!("camel-cli: context started");
408
409 let watch_enabled = cli_watch.unwrap_or(camel_config.watch);
412
413 let watcher_shutdown = CancellationToken::new();
415 if watch_enabled {
416 let ctrl = ctx.runtime_execution_handle();
417 let watch_patterns = patterns.clone();
418 let watch_security_compile_context = security_compile_context.clone();
419 let drain_timeout = std::time::Duration::from_millis(camel_config.drain_timeout_ms);
420 let debounce = std::time::Duration::from_millis(camel_config.watch_debounce_ms);
421 let watcher_token = watcher_shutdown.clone();
422 tokio::spawn(async move {
423 let watch_dirs = camel_core::reload_watcher::resolve_watch_dirs(&watch_patterns);
424 let result = camel_core::reload_watcher::watch_and_reload(
425 watch_dirs,
426 ctrl,
427 move || {
428 camel_dsl::discover_routes_with_threshold_and_security(
429 &watch_patterns,
430 camel_config.stream_caching.threshold,
431 watch_security_compile_context.clone(),
432 )
433 .map_err(|e| camel_api::CamelError::RouteError(e.to_string()))
434 },
435 Some(watcher_token),
436 drain_timeout,
437 debounce,
438 )
439 .await;
440 if let Err(e) = result {
441 tracing::error!("File watcher failed: {}", e);
443 }
444 });
445 tracing::info!(
446 "camel-cli: hot-reload watching {:?}. Press Ctrl+C to stop.",
447 patterns
448 );
449 } else {
450 tracing::info!("camel-cli: running (hot-reload disabled). Press Ctrl+C to stop.");
451 }
452
453 tokio::select! {
454 _ = tokio::signal::ctrl_c() => tracing::info!("Received Ctrl+C"),
455 _ = async {
456 #[cfg(unix)]
457 {
458 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
459 .expect("Failed to install SIGTERM handler") .recv()
461 .await
462 }
463 #[cfg(not(unix))]
464 {
465 std::future::pending::<()>().await
466 }
467 } => tracing::info!("Received SIGTERM"),
468 }
469
470 let force_exit = tokio::spawn(async {
472 tokio::signal::ctrl_c().await.ok();
473 tracing::warn!("Second Ctrl+C — forcing exit");
474 std::process::exit(1);
475 });
476
477 tracing::info!("camel-cli: shutting down...");
478 watcher_shutdown.cancel();
479
480 jms_pool.begin_shutdown();
482 cxf_pool.begin_shutdown();
483
484 ctx.stop().await.unwrap_or_else(|e| {
486 tracing::error!("Error during shutdown: {}", e);
488 });
489
490 const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
492
493 match tokio::time::timeout(SHUTDOWN_TIMEOUT, jms_pool.shutdown()).await {
494 Ok(Ok(())) => {}
495 Ok(Err(e)) => {
496 tracing::error!("JMS pool shutdown failed: {}", e);
498 }
499 Err(_) => tracing::warn!("JMS pool shutdown timed out after 30s"),
500 }
501
502 match tokio::time::timeout(SHUTDOWN_TIMEOUT, cxf_pool.shutdown()).await {
503 Ok(Ok(())) => {}
504 Ok(Err(e)) => {
505 tracing::error!("CXF pool shutdown failed: {}", e);
507 }
508 Err(_) => tracing::warn!("CXF pool shutdown timed out after 30s"),
509 }
510
511 force_exit.abort();
512
513 tracing::info!("camel-cli: stopped");
514 Ok(())
515}