Skip to main content

camel_cli/commands/
run.rs

1//! `camel run` subcommand body.
2//!
3//! Owns the 7 ADR-0012 `error!` sites migrated in Phase C (see ADR-0012 +
4//! Phase C plan). Each site has a `// log-policy: …` annotation that
5//! classifies it per the ADR taxonomy.
6//!
7//! All sites in this file are category (c) or (d) — system-broken /
8//! bootstrap. The annotations are added in the Infra cluster task (Task 9),
9//! NOT in this split commit.
10
11use camel_api::datasource::DatasourceCatalog;
12use camel_bean::BeanProcessor;
13use camel_core::datasource::RuntimeDatasourceCatalog;
14use camel_language_js::JsLanguage;
15use camel_language_jsonpath::JsonPathLanguage;
16use camel_language_rhai::RhaiLanguage;
17use camel_language_xpath::XPathLanguage;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio_util::sync::CancellationToken;
21
22struct BridgeCleanup {
23    xslt: Arc<camel_xslt::XsltBridgeRuntime>,
24    xj: Arc<camel_xj::XjBridgeRuntime>,
25    validator: Option<Arc<camel_component_validator::xsd_bridge::XsdBridgeBackend>>,
26}
27
28#[async_trait::async_trait]
29impl camel_api::lifecycle::Lifecycle for BridgeCleanup {
30    fn name(&self) -> &str {
31        "bridge-cleanup"
32    }
33
34    async fn start(&mut self) -> Result<(), camel_api::CamelError> {
35        Ok(())
36    }
37
38    async fn stop(&mut self) -> Result<(), camel_api::CamelError> {
39        self.xslt.shutdown().await;
40        self.xj.shutdown().await;
41        if let Some(validator) = &self.validator {
42            validator.shutdown().await;
43        }
44        Ok(())
45    }
46}
47
48pub async fn run(
49    routes_override: Option<String>,
50    config_path: String,
51    cli_watch: Option<bool>,
52    otel: bool,
53    otel_endpoint: Option<String>,
54    service_name: Option<String>,
55    health_port: Option<u16>,
56) -> Result<(), camel_api::CamelError> {
57    // 1. Load config (fall back to empty config with serde defaults if Camel.toml not found)
58    let mut camel_config: camel_config::config::CamelConfig =
59        camel_config::config::CamelConfig::from_file(&config_path).unwrap_or_else(|_| {
60            // Build an empty config so serde defaults apply
61            config::Config::builder()
62                .build()
63                .and_then(|c| c.try_deserialize())
64                .unwrap_or_else(|e| {
65                    eprintln!("Failed to build default config: {e}");
66                    std::process::exit(1);
67                })
68        });
69
70    // 1b. Apply OTel CLI overrides (--otel-endpoint and --service-name imply --otel)
71    let otel_enabled = otel || otel_endpoint.is_some() || service_name.is_some();
72    if otel_enabled {
73        let otel_cfg =
74            camel_config
75                .observability
76                .otel
77                .get_or_insert(camel_config::OtelCamelConfig {
78                    enabled: true,
79                    endpoint: "http://localhost:4317".to_string(),
80                    service_name: "rust-camel".to_string(),
81                    ..Default::default()
82                });
83        otel_cfg.enabled = true;
84        if let Some(ep) = otel_endpoint {
85            otel_cfg.endpoint = ep;
86        }
87        if let Some(name) = service_name {
88            otel_cfg.service_name = name;
89        }
90    }
91
92    if let Some(port) = health_port {
93        let health_cfg = camel_config
94            .observability
95            .health
96            .get_or_insert(camel_config::config::HealthCamelConfig::default());
97        health_cfg.enabled = true;
98        health_cfg.port = port;
99    }
100
101    // 2. Build context with beans registry (also initialises tracing subscriber)
102    let beans_registry = {
103        let bean_reg = std::sync::Arc::new(std::sync::Mutex::new(camel_bean::BeanRegistry::new()));
104        if camel_config.beans.is_empty() {
105            None
106        } else {
107            Some(bean_reg)
108        }
109    };
110
111    let mut ctx = camel_config::config::CamelConfig::configure_context_with_beans(
112        &camel_config,
113        beans_registry.clone(),
114    )
115    .await
116    .unwrap_or_else(|e| {
117        eprintln!("Failed to configure CamelContext: {e}");
118        std::process::exit(1);
119    });
120
121    match camel_function::FunctionRuntimeService::with_default_container_provider(
122        camel_function::FunctionConfig::default(),
123    ) {
124        Ok(svc) => ctx = ctx.with_lifecycle(svc),
125        Err(e) => tracing::warn!("Function runtime disabled: {e}"),
126    }
127
128    // 3a. Create datasource catalog from configured datasources, wiring health registry
129    let datasource_catalog: Arc<dyn DatasourceCatalog> = {
130        let catalog = RuntimeDatasourceCatalog::new(camel_config.datasources.clone())
131            .with_health_registry(ctx.health_registry());
132        Arc::new(catalog)
133    };
134
135    // Load WASM beans after context is created (needs component registry)
136    #[cfg(feature = "wasm")]
137    if let Some(ref bean_reg) = beans_registry {
138        let component_registry = ctx.registry_arc();
139        let plugins_dir_raw = camel_config
140            .components
141            .raw
142            .get("wasm")
143            .and_then(|v| v.get("plugins_dir"))
144            .and_then(|v| v.as_str())
145            .unwrap_or("plugins");
146        let config_dir = std::path::Path::new(&config_path)
147            .parent()
148            .map(|p| {
149                if p.as_os_str().is_empty() {
150                    std::path::Path::new(".")
151                } else {
152                    p
153                }
154            })
155            .unwrap_or(std::path::Path::new("."));
156        let camel_root = config_dir.canonicalize().unwrap_or_else(|e| {
157            eprintln!("Error: cannot resolve project root: {e}");
158            std::process::exit(1);
159        });
160        crate::commands::plugin::validate_plugins_dir(&camel_root, plugins_dir_raw).unwrap_or_else(
161            |e| {
162                eprintln!("Error: invalid plugins_dir: {e}");
163                std::process::exit(1);
164            },
165        );
166        let plugins_dir = camel_root.join(plugins_dir_raw);
167        for (bean_name, bean_cfg) in &camel_config.beans {
168            tracing::info!(bean = %bean_name, plugin = %bean_cfg.plugin, "registering WASM bean");
169
170            if !bean_cfg
171                .plugin
172                .chars()
173                .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
174            {
175                eprintln!(
176                    "Invalid bean plugin name '{}': must be alphanumeric with - or _",
177                    bean_cfg.plugin
178                );
179                std::process::exit(1);
180            }
181
182            let wasm_path = plugins_dir.join(format!("{}.wasm", bean_cfg.plugin));
183            let canonical_plugins = plugins_dir.canonicalize().unwrap_or_else(|_| {
184                eprintln!("Plugins directory not found: {}", plugins_dir.display());
185                std::process::exit(1);
186            });
187            let canonical_path = wasm_path.canonicalize().unwrap_or_else(|_| {
188                eprintln!("WASM bean plugin not found: {}", wasm_path.display());
189                std::process::exit(1);
190            });
191            if !canonical_path.starts_with(&canonical_plugins) {
192                eprintln!(
193                    "Bean plugin path escapes plugins directory: {}",
194                    bean_cfg.plugin
195                );
196                std::process::exit(1);
197            }
198            let wasm_config =
199                camel_component_wasm::config::WasmConfig::from_limits(&bean_cfg.limits);
200            let wasm_bean = camel_component_wasm::bean::WasmBean::new(
201                &wasm_path,
202                wasm_config,
203                component_registry.clone(),
204                bean_cfg.config.clone(),
205            )
206            .await
207            .unwrap_or_else(|e| {
208                eprintln!("Failed to load WASM bean '{}': {}", bean_name, e);
209                std::process::exit(1);
210            });
211            tracing::info!(
212                bean = %bean_name,
213                plugin = %bean_cfg.plugin,
214                methods = ?wasm_bean.methods(),
215                "WASM bean loaded"
216            );
217            bean_reg
218                .lock()
219                .expect("beans registry lock") // allow-unwrap
220                .register(bean_name, wasm_bean)
221                .unwrap_or_else(|e| {
222                    eprintln!("Bean registration failed for '{}': {}", bean_name, e);
223                    std::process::exit(1);
224                });
225        }
226    }
227
228    // 3. Determine route patterns
229    let patterns: Vec<String> = if let Some(p) = routes_override {
230        vec![p]
231    } else if !camel_config.routes.is_empty() {
232        camel_config.routes.clone()
233    } else {
234        vec!["routes/*.yaml".to_string()]
235    };
236
237    tracing::info!("camel-cli: loading routes from patterns: {:?}", patterns);
238
239    let security_compile_context =
240        crate::build_security_compile_context_from_config(&camel_config, ctx.registry_arc())
241            .await?;
242
243    // Define register_bundle! macro — looks up config key in ComponentsConfig::raw,
244    // falling back to an empty table so bundles always register with their serde defaults.
245    // Uses UFCS to invoke ComponentBundle methods without requiring trait in scope
246    macro_rules! register_bundle {
247        ($ctx:expr, $cfg:expr, $Bundle:ty) => {
248            let raw = $cfg
249                .components
250                .raw
251                .get(<$Bundle as camel_component_api::ComponentBundle>::config_key())
252                .cloned()
253                .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
254            match <$Bundle as camel_component_api::ComponentBundle>::from_toml(raw) {
255                Ok(bundle) => <$Bundle as camel_component_api::ComponentBundle>::register_all(
256                    bundle, &mut $ctx,
257                ),
258                Err(e) => {
259                    return Err(camel_api::CamelError::Config(format!(
260                        "Failed to load {} config: {}",
261                        <$Bundle as camel_component_api::ComponentBundle>::config_key(),
262                        e
263                    )));
264                }
265            }
266        };
267    }
268
269    // Register built-in components (no config needed)
270    ctx.register_component(camel_component_timer::TimerComponent::new());
271    ctx.register_component(camel_component_log::LogComponent::new());
272    ctx.register_component(camel_component_direct::DirectComponent::new());
273    ctx.register_component(camel_component_seda::SedaComponent::new());
274    ctx.register_component(camel_component_mock::MockComponent::new());
275    ctx.register_component(camel_component_controlbus::ControlBusComponent::new());
276    let validator_component = camel_component_validator::ValidatorComponent::new();
277    let validator_backend = validator_component.xsd_bridge_backend();
278    ctx.register_component(validator_component);
279
280    let xslt_component = camel_xslt::XsltComponent::default();
281    let xslt_runtime = xslt_component.bridge_runtime();
282    ctx.register_component(xslt_component);
283
284    let xj_component = camel_xj::XjComponent::default();
285    let xj_runtime = xj_component.bridge_runtime();
286    ctx.register_component(xj_component);
287
288    ctx = ctx.with_lifecycle(BridgeCleanup {
289        xslt: xslt_runtime,
290        xj: xj_runtime,
291        validator: validator_backend,
292    });
293
294    // Register HTTP, WS, File, Container (always-on in camel-cli, no feature flag)
295    register_bundle!(ctx, camel_config, camel_component_http::HttpBundle);
296    #[cfg(feature = "http-static")]
297    register_bundle!(ctx, camel_config, camel_component_http::HttpStaticBundle);
298    register_bundle!(ctx, camel_config, camel_component_ws::WsBundle);
299    register_bundle!(ctx, camel_config, camel_component_file::FileBundle);
300    register_bundle!(
301        ctx,
302        camel_config,
303        camel_component_container::ContainerBundle
304    );
305
306    // Register optional/feature-gated bundles
307    let jms_pool = {
308        let raw = camel_config
309            .components
310            .raw
311            .get("jms")
312            .cloned()
313            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
314        match <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::from_toml(
315            raw,
316        ) {
317            Ok(bundle) => {
318                let pool = bundle.pool();
319                <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
320                pool
321            }
322            Err(e) => {
323                return Err(camel_api::CamelError::Config(format!(
324                    "Failed to load jms config: {e}"
325                )));
326            }
327        }
328    };
329
330    let cxf_pool = {
331        let raw = camel_config
332            .components
333            .raw
334            .get("cxf")
335            .cloned()
336            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
337        match <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::from_toml(
338            raw,
339        ) {
340            Ok(bundle) => {
341                let pool = bundle.pool();
342                <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
343                pool
344            }
345            Err(e) => {
346                return Err(camel_api::CamelError::Config(format!(
347                    "Failed to load cxf config: {e}"
348                )));
349            }
350        }
351    };
352
353    #[cfg(feature = "kafka")]
354    register_bundle!(ctx, camel_config, camel_component_kafka::KafkaBundle);
355    register_bundle!(ctx, camel_config, camel_master::MasterBundle);
356    register_bundle!(
357        ctx,
358        camel_config,
359        camel_component_opensearch::OpenSearchBundle
360    );
361    register_bundle!(ctx, camel_config, camel_component_redis::RedisBundle);
362    {
363        let sql_raw = camel_config
364            .components
365            .raw
366            .get(<camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::config_key())
367            .cloned()
368            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
369        match <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::from_toml(
370            sql_raw,
371        ) {
372            Ok(bundle) => {
373                let bundle = bundle.with_catalog(Arc::clone(&datasource_catalog));
374                <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
375            }
376            Err(e) => {
377                // log-policy: system-broken
378                tracing::error!("failed to initialize SQL bundle: {}", e);
379            }
380        }
381    }
382    #[cfg(feature = "grpc")]
383    register_bundle!(ctx, camel_config, camel_component_grpc::GrpcBundle);
384
385    #[cfg(feature = "wasm")]
386    {
387        let base_dir = std::path::Path::new(&config_path)
388            .parent()
389            .unwrap_or(std::path::Path::new("."))
390            .to_path_buf();
391        let wasm_bundle = camel_component_wasm::WasmBundle::new(ctx.registry_arc(), base_dir);
392        <camel_component_wasm::WasmBundle as camel_component_api::ComponentBundle>::register_all(
393            wasm_bundle,
394            &mut ctx,
395        );
396    }
397
398    // Register language plugins bundled in camel-cli.
399    // These languages are optional in core, so the CLI wires them explicitly.
400    ctx.register_language("js", Box::new(JsLanguage::new()))?;
401    ctx.register_language("javascript", Box::new(JsLanguage::new()))?;
402    ctx.register_language("rhai", Box::new(RhaiLanguage::new()))?;
403    ctx.register_language("jsonpath", Box::new(JsonPathLanguage::new()))?;
404    ctx.register_language("xpath", Box::new(XPathLanguage::new()))?;
405
406    // 5. Discover and load initial routes
407    match camel_dsl::discover_routes_with_threshold_and_security(
408        &patterns,
409        camel_config.stream_caching.threshold,
410        security_compile_context.clone(),
411    ) {
412        Ok(defs) => {
413            for def in defs {
414                let id = def.route_id().to_string();
415                if let Err(e) = ctx.add_route_definition(def).await {
416                    // log-policy: system-broken
417                    tracing::error!("Failed to add route '{}': {}", id, e);
418                }
419            }
420        }
421        Err(e) => {
422            // log-policy: system-broken
423            tracing::error!("Failed to discover routes: {}", e);
424            std::process::exit(1);
425        }
426    }
427
428    // 6. Start context
429    if let Err(e) = ctx.start().await {
430        // log-policy: system-broken
431        tracing::error!("Failed to start CamelContext: {}", e);
432        std::process::exit(1);
433    }
434
435    tracing::info!("camel-cli: context started");
436
437    // 7. Resolve whether to enable the file watcher:
438    //    CLI flag takes precedence; falls back to Camel.toml `watch` field (default: false).
439    let watch_enabled = cli_watch.unwrap_or(camel_config.watch);
440
441    // 8. Optionally start file watcher in background
442    let watcher_shutdown = CancellationToken::new();
443    if watch_enabled {
444        let ctrl = ctx.runtime_execution_handle();
445        let watch_patterns = patterns.clone();
446        let watch_security_compile_context = security_compile_context.clone();
447        let drain_timeout = std::time::Duration::from_millis(camel_config.drain_timeout_ms);
448        let debounce = std::time::Duration::from_millis(camel_config.watch_debounce_ms);
449        let watcher_token = watcher_shutdown.clone();
450        tokio::spawn(async move {
451            let watch_dirs = camel_core::reload_watcher::resolve_watch_dirs(&watch_patterns);
452            let result = camel_core::reload_watcher::watch_and_reload(
453                watch_dirs,
454                ctrl,
455                move || {
456                    camel_dsl::discover_routes_with_threshold_and_security(
457                        &watch_patterns,
458                        camel_config.stream_caching.threshold,
459                        watch_security_compile_context.clone(),
460                    )
461                    .map_err(|e| camel_api::CamelError::RouteError(e.to_string()))
462                },
463                Some(watcher_token),
464                drain_timeout,
465                debounce,
466            )
467            .await;
468            if let Err(e) = result {
469                // log-policy: system-broken
470                tracing::error!("File watcher failed: {}", e);
471            }
472        });
473        tracing::info!(
474            "camel-cli: hot-reload watching {:?}. Press Ctrl+C to stop.",
475            patterns
476        );
477    } else {
478        tracing::info!("camel-cli: running (hot-reload disabled). Press Ctrl+C to stop.");
479    }
480
481    tokio::select! {
482        _ = tokio::signal::ctrl_c() => tracing::info!("Received Ctrl+C"),
483        _ = async {
484            #[cfg(unix)]
485            {
486                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
487                    .expect("Failed to install SIGTERM handler") // allow-unwrap
488                    .recv()
489                    .await
490            }
491            #[cfg(not(unix))]
492            {
493                std::future::pending::<()>().await
494            }
495        } => tracing::info!("Received SIGTERM"),
496    }
497
498    // Second Ctrl+C = force exit
499    let force_exit = tokio::spawn(async {
500        tokio::signal::ctrl_c().await.ok();
501        tracing::warn!("Second Ctrl+C — forcing exit");
502        std::process::exit(1);
503    });
504
505    tracing::info!("camel-cli: shutting down...");
506    watcher_shutdown.cancel();
507
508    // Signal pools to stop restarting BEFORE context shutdown
509    jms_pool.begin_shutdown();
510    cxf_pool.begin_shutdown();
511
512    // Stop context (routes + lifecycle services)
513    ctx.stop().await.unwrap_or_else(|e| {
514        // log-policy: system-broken
515        tracing::error!("Error during shutdown: {}", e);
516    });
517
518    // Tear down bridge pools with timeouts
519    const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
520
521    match tokio::time::timeout(SHUTDOWN_TIMEOUT, jms_pool.shutdown()).await {
522        Ok(Ok(())) => {}
523        Ok(Err(e)) => {
524            // log-policy: system-broken
525            tracing::error!("JMS pool shutdown failed: {}", e);
526        }
527        Err(_) => tracing::warn!("JMS pool shutdown timed out after 30s"),
528    }
529
530    match tokio::time::timeout(SHUTDOWN_TIMEOUT, cxf_pool.shutdown()).await {
531        Ok(Ok(())) => {}
532        Ok(Err(e)) => {
533            // log-policy: system-broken
534            tracing::error!("CXF pool shutdown failed: {}", e);
535        }
536        Err(_) => tracing::warn!("CXF pool shutdown timed out after 30s"),
537    }
538
539    force_exit.abort();
540
541    tracing::info!("camel-cli: stopped");
542    Ok(())
543}