Skip to main content

camel_cli/commands/
run.rs

1//! `camel run` subcommand body.
2//!
3//! Owns the 7 ADR-0012 `error!` sites migrated in Phase C (see ADR-0012 +
4//! Phase C plan). Each site has a `// log-policy: …` annotation that
5//! classifies it per the ADR taxonomy.
6//!
7//! All sites in this file are category (c) or (d) — system-broken /
8//! bootstrap. The annotations are added in the Infra cluster task (Task 9),
9//! NOT in this split commit.
10
11use camel_api::datasource::DatasourceCatalog;
12use camel_bean::BeanProcessor;
13use camel_core::datasource::RuntimeDatasourceCatalog;
14use camel_language_js::JsLanguage;
15use camel_language_jsonpath::JsonPathLanguage;
16use camel_language_rhai::RhaiLanguage;
17use camel_language_xpath::XPathLanguage;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio_util::sync::CancellationToken;
21
22struct BridgeCleanup {
23    xslt: Arc<camel_xslt::XsltBridgeRuntime>,
24    xj: Arc<camel_xj::XjBridgeRuntime>,
25    validator: Option<Arc<camel_component_validator::xsd_bridge::XsdBridgeBackend>>,
26}
27
28#[async_trait::async_trait]
29impl camel_api::lifecycle::Lifecycle for BridgeCleanup {
30    fn name(&self) -> &str {
31        "bridge-cleanup"
32    }
33
34    async fn start(&mut self) -> Result<(), camel_api::CamelError> {
35        Ok(())
36    }
37
38    async fn stop(&mut self) -> Result<(), camel_api::CamelError> {
39        self.xslt.shutdown().await;
40        self.xj.shutdown().await;
41        if let Some(validator) = &self.validator {
42            validator.shutdown().await;
43        }
44        Ok(())
45    }
46}
47
48pub async fn run(
49    routes_override: Option<String>,
50    config_path: String,
51    cli_watch: Option<bool>,
52    otel: bool,
53    otel_endpoint: Option<String>,
54    service_name: Option<String>,
55    health_port: Option<u16>,
56) -> Result<(), camel_api::CamelError> {
57    // 1. Load config (fall back to empty config with serde defaults if Camel.toml not found)
58    let mut camel_config: camel_config::config::CamelConfig =
59        camel_config::config::CamelConfig::from_file(&config_path).unwrap_or_else(|_| {
60            // Build an empty config so serde defaults apply
61            config::Config::builder()
62                .build()
63                .and_then(|c| c.try_deserialize())
64                .unwrap_or_else(|e| {
65                    eprintln!("Failed to build default config: {e}");
66                    std::process::exit(1);
67                })
68        });
69
70    // 1b. Apply OTel CLI overrides (--otel-endpoint and --service-name imply --otel)
71    let otel_enabled = otel || otel_endpoint.is_some() || service_name.is_some();
72    if otel_enabled {
73        let otel_cfg =
74            camel_config
75                .observability
76                .otel
77                .get_or_insert(camel_config::OtelCamelConfig {
78                    enabled: true,
79                    endpoint: "http://localhost:4317".to_string(),
80                    service_name: "rust-camel".to_string(),
81                    ..Default::default()
82                });
83        otel_cfg.enabled = true;
84        if let Some(ep) = otel_endpoint {
85            otel_cfg.endpoint = ep;
86        }
87        if let Some(name) = service_name {
88            otel_cfg.service_name = name;
89        }
90    }
91
92    if let Some(port) = health_port {
93        let health_cfg = camel_config
94            .observability
95            .health
96            .get_or_insert(camel_config::config::HealthCamelConfig::default());
97        health_cfg.enabled = true;
98        health_cfg.port = port;
99    }
100
101    // 2. Build context with beans registry (also initialises tracing subscriber)
102    let beans_registry = {
103        let bean_reg = std::sync::Arc::new(std::sync::Mutex::new(camel_bean::BeanRegistry::new()));
104        if camel_config.beans.is_empty() {
105            None
106        } else {
107            Some(bean_reg)
108        }
109    };
110
111    let mut ctx = camel_config::config::CamelConfig::configure_context_with_beans(
112        &camel_config,
113        beans_registry.clone(),
114    )
115    .await
116    .unwrap_or_else(|e| {
117        eprintln!("Failed to configure CamelContext: {e}");
118        std::process::exit(1);
119    });
120
121    match camel_function::FunctionRuntimeService::with_default_container_provider(
122        camel_function::FunctionConfig::default(),
123    ) {
124        Ok(svc) => ctx = ctx.with_lifecycle(svc),
125        Err(e) => tracing::warn!("Function runtime disabled: {e}"),
126    }
127
128    // 3a. Create datasource catalog from configured datasources, wiring health registry
129    let datasource_catalog: Arc<dyn DatasourceCatalog> = {
130        let catalog = RuntimeDatasourceCatalog::new(camel_config.datasources.clone())
131            .with_health_registry(ctx.health_registry());
132        Arc::new(catalog)
133    };
134
135    // Load WASM beans after context is created (needs component registry)
136    #[cfg(feature = "wasm")]
137    if let Some(ref bean_reg) = beans_registry {
138        let component_registry = ctx.registry_arc();
139        let plugins_dir_raw = camel_config
140            .components
141            .raw
142            .get("wasm")
143            .and_then(|v| v.get("plugins_dir"))
144            .and_then(|v| v.as_str())
145            .unwrap_or("plugins");
146        let config_dir = std::path::Path::new(&config_path)
147            .parent()
148            .map(|p| {
149                if p.as_os_str().is_empty() {
150                    std::path::Path::new(".")
151                } else {
152                    p
153                }
154            })
155            .unwrap_or(std::path::Path::new("."));
156        let camel_root = config_dir.canonicalize().unwrap_or_else(|e| {
157            eprintln!("Error: cannot resolve project root: {e}");
158            std::process::exit(1);
159        });
160        crate::commands::plugin::validate_plugins_dir(&camel_root, plugins_dir_raw).unwrap_or_else(
161            |e| {
162                eprintln!("Error: invalid plugins_dir: {e}");
163                std::process::exit(1);
164            },
165        );
166        let plugins_dir = camel_root.join(plugins_dir_raw);
167        for (bean_name, bean_cfg) in &camel_config.beans {
168            tracing::info!(bean = %bean_name, plugin = %bean_cfg.plugin, "registering WASM bean");
169
170            if !bean_cfg
171                .plugin
172                .chars()
173                .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
174            {
175                eprintln!(
176                    "Invalid bean plugin name '{}': must be alphanumeric with - or _",
177                    bean_cfg.plugin
178                );
179                std::process::exit(1);
180            }
181
182            let wasm_path = plugins_dir.join(format!("{}.wasm", bean_cfg.plugin));
183            let canonical_plugins = plugins_dir.canonicalize().unwrap_or_else(|_| {
184                eprintln!("Plugins directory not found: {}", plugins_dir.display());
185                std::process::exit(1);
186            });
187            let canonical_path = wasm_path.canonicalize().unwrap_or_else(|_| {
188                eprintln!("WASM bean plugin not found: {}", wasm_path.display());
189                std::process::exit(1);
190            });
191            if !canonical_path.starts_with(&canonical_plugins) {
192                eprintln!(
193                    "Bean plugin path escapes plugins directory: {}",
194                    bean_cfg.plugin
195                );
196                std::process::exit(1);
197            }
198            let wasm_config =
199                camel_component_wasm::config::WasmConfig::from_limits(&bean_cfg.limits);
200            let wasm_bean = camel_component_wasm::bean::WasmBean::new(
201                &wasm_path,
202                wasm_config,
203                component_registry.clone(),
204                bean_cfg.config.clone(),
205            )
206            .await
207            .unwrap_or_else(|e| {
208                eprintln!("Failed to load WASM bean '{}': {}", bean_name, e);
209                std::process::exit(1);
210            });
211            tracing::info!(
212                bean = %bean_name,
213                plugin = %bean_cfg.plugin,
214                methods = ?wasm_bean.methods(),
215                "WASM bean loaded"
216            );
217            bean_reg
218                .lock()
219                .expect("beans registry lock") // allow-unwrap
220                .register(bean_name, wasm_bean)
221                .unwrap_or_else(|e| {
222                    eprintln!("Bean registration failed for '{}': {}", bean_name, e);
223                    std::process::exit(1);
224                });
225        }
226    }
227
228    // 3. Determine route patterns
229    let patterns: Vec<String> = if let Some(p) = routes_override {
230        vec![p]
231    } else if !camel_config.routes.is_empty() {
232        camel_config.routes.clone()
233    } else {
234        vec!["routes/*.yaml".to_string()]
235    };
236
237    tracing::info!("camel-cli: loading routes from patterns: {:?}", patterns);
238
239    let security_compile_context =
240        crate::build_security_compile_context_from_config(&camel_config, ctx.registry_arc())
241            .await?;
242
243    // Define register_bundle! macro — looks up config key in ComponentsConfig::raw,
244    // falling back to an empty table so bundles always register with their serde defaults.
245    // Uses UFCS to invoke ComponentBundle methods without requiring trait in scope
246    macro_rules! register_bundle {
247        ($ctx:expr, $cfg:expr, $Bundle:ty) => {
248            let raw = $cfg
249                .components
250                .raw
251                .get(<$Bundle as camel_component_api::ComponentBundle>::config_key())
252                .cloned()
253                .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
254            match <$Bundle as camel_component_api::ComponentBundle>::from_toml(raw) {
255                Ok(bundle) => <$Bundle as camel_component_api::ComponentBundle>::register_all(
256                    bundle, &mut $ctx,
257                ),
258                Err(e) => {
259                    return Err(camel_api::CamelError::Config(format!(
260                        "Failed to load {} config: {}",
261                        <$Bundle as camel_component_api::ComponentBundle>::config_key(),
262                        e
263                    )));
264                }
265            }
266        };
267    }
268
269    // Register built-in components (no config needed)
270    ctx.register_component(camel_component_timer::TimerComponent::new());
271    ctx.register_component(camel_component_log::LogComponent::new());
272    ctx.register_component(camel_component_direct::DirectComponent::new());
273    ctx.register_component(camel_component_seda::SedaComponent::new());
274    ctx.register_component(camel_component_mock::MockComponent::new());
275    ctx.register_component(camel_component_controlbus::ControlBusComponent::new());
276    let validator_component = camel_component_validator::ValidatorComponent::new();
277    let validator_backend = validator_component.xsd_bridge_backend();
278    ctx.register_component(validator_component);
279
280    let xslt_component = camel_xslt::XsltComponent::default();
281    let xslt_runtime = xslt_component.bridge_runtime();
282    ctx.register_component(xslt_component);
283
284    let xj_component = camel_xj::XjComponent::default();
285    let xj_runtime = xj_component.bridge_runtime();
286    ctx.register_component(xj_component);
287
288    ctx = ctx.with_lifecycle(BridgeCleanup {
289        xslt: xslt_runtime,
290        xj: xj_runtime,
291        validator: validator_backend,
292    });
293
294    // Register HTTP, WS, File, Container (always-on in camel-cli, no feature flag)
295    register_bundle!(ctx, camel_config, camel_component_http::HttpBundle);
296    #[cfg(feature = "http-static")]
297    register_bundle!(ctx, camel_config, camel_component_http::HttpStaticBundle);
298    register_bundle!(ctx, camel_config, camel_component_ws::WsBundle);
299    register_bundle!(ctx, camel_config, camel_component_file::FileBundle);
300    register_bundle!(
301        ctx,
302        camel_config,
303        camel_component_container::ContainerBundle
304    );
305
306    // Register optional/feature-gated bundles
307    let jms_pool = {
308        let raw = camel_config
309            .components
310            .raw
311            .get("jms")
312            .cloned()
313            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
314        match <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::from_toml(
315            raw,
316        ) {
317            Ok(bundle) => {
318                let pool = bundle.pool();
319                <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
320                pool
321            }
322            Err(e) => {
323                return Err(camel_api::CamelError::Config(format!(
324                    "Failed to load jms config: {e}"
325                )));
326            }
327        }
328    };
329
330    let cxf_pool = {
331        let raw = camel_config
332            .components
333            .raw
334            .get("cxf")
335            .cloned()
336            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
337        match <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::from_toml(
338            raw,
339        ) {
340            Ok(bundle) => {
341                let pool = bundle.pool();
342                <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
343                pool
344            }
345            Err(e) => {
346                return Err(camel_api::CamelError::Config(format!(
347                    "Failed to load cxf config: {e}"
348                )));
349            }
350        }
351    };
352
353    #[cfg(feature = "kafka")]
354    register_bundle!(ctx, camel_config, camel_component_kafka::KafkaBundle);
355    register_bundle!(ctx, camel_config, camel_master::MasterBundle);
356    register_bundle!(
357        ctx,
358        camel_config,
359        camel_component_opensearch::OpenSearchBundle
360    );
361    register_bundle!(ctx, camel_config, camel_component_redis::RedisBundle);
362    {
363        let sql_raw = camel_config
364            .components
365            .raw
366            .get(<camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::config_key())
367            .cloned()
368            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
369        match <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::from_toml(
370            sql_raw,
371        ) {
372            Ok(bundle) => {
373                let bundle = bundle.with_catalog(Arc::clone(&datasource_catalog));
374                <camel_component_sql::SqlBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
375            }
376            Err(e) => {
377                // log-policy: system-broken
378                tracing::error!("failed to initialize SQL bundle: {}", e);
379            }
380        }
381    }
382    #[cfg(feature = "surrealdb")]
383    {
384        let surrealdb_raw = camel_config
385            .components
386            .raw
387            .get(<camel_component_surrealdb::SurrealDbBundle as camel_component_api::ComponentBundle>::config_key())
388            .cloned()
389            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
390        match <camel_component_surrealdb::SurrealDbBundle as camel_component_api::ComponentBundle>::from_toml(
391            surrealdb_raw,
392        ) {
393            Ok(bundle) => {
394                let bundle = bundle.with_catalog(Arc::clone(&datasource_catalog));
395                <camel_component_surrealdb::SurrealDbBundle as camel_component_api::ComponentBundle>::register_all(
396                    bundle, &mut ctx,
397                );
398            }
399            Err(e) => {
400                // log-policy: system-broken
401                tracing::error!("failed to initialize SurrealDB bundle: {}", e);
402            }
403        }
404    }
405    #[cfg(feature = "grpc")]
406    register_bundle!(ctx, camel_config, camel_component_grpc::GrpcBundle);
407
408    #[cfg(feature = "llm")]
409    register_bundle!(ctx, camel_config, camel_component_llm::LlmBundle);
410
411    #[cfg(feature = "wasm")]
412    {
413        let base_dir = std::path::Path::new(&config_path)
414            .parent()
415            .unwrap_or(std::path::Path::new("."))
416            .to_path_buf();
417        let wasm_bundle = camel_component_wasm::WasmBundle::new(ctx.registry_arc(), base_dir);
418        <camel_component_wasm::WasmBundle as camel_component_api::ComponentBundle>::register_all(
419            wasm_bundle,
420            &mut ctx,
421        );
422    }
423
424    // Register language plugins bundled in camel-cli.
425    // These languages are optional in core, so the CLI wires them explicitly.
426    ctx.register_language("js", Box::new(JsLanguage::new()))?;
427    ctx.register_language("javascript", Box::new(JsLanguage::new()))?;
428    ctx.register_language("rhai", Box::new(RhaiLanguage::new()))?;
429    ctx.register_language("jsonpath", Box::new(JsonPathLanguage::new()))?;
430    ctx.register_language("xpath", Box::new(XPathLanguage::new()))?;
431
432    // 5. Discover and load initial routes
433    match camel_dsl::discover_routes_with_threshold_and_security(
434        &patterns,
435        camel_config.stream_caching.threshold,
436        security_compile_context.clone(),
437    ) {
438        Ok(defs) => {
439            for def in defs {
440                let id = def.route_id().to_string();
441                if let Err(e) = ctx.add_route_definition(def).await {
442                    // log-policy: system-broken
443                    tracing::error!("Failed to add route '{}': {}", id, e);
444                }
445            }
446        }
447        Err(e) => {
448            // log-policy: system-broken
449            tracing::error!("Failed to discover routes: {}", e);
450            std::process::exit(1);
451        }
452    }
453
454    // 6. Start context
455    if let Err(e) = ctx.start().await {
456        // log-policy: system-broken
457        tracing::error!("Failed to start CamelContext: {}", e);
458        std::process::exit(1);
459    }
460
461    tracing::info!("camel-cli: context started");
462
463    // 7. Resolve whether to enable the file watcher:
464    //    CLI flag takes precedence; falls back to Camel.toml `watch` field (default: false).
465    let watch_enabled = cli_watch.unwrap_or(camel_config.watch);
466
467    // 8. Optionally start file watcher in background
468    let watcher_shutdown = CancellationToken::new();
469    if watch_enabled {
470        let ctrl = ctx.runtime_execution_handle();
471        let watch_patterns = patterns.clone();
472        let watch_security_compile_context = security_compile_context.clone();
473        let drain_timeout = std::time::Duration::from_millis(camel_config.drain_timeout_ms);
474        let debounce = std::time::Duration::from_millis(camel_config.watch_debounce_ms);
475        let watcher_token = watcher_shutdown.clone();
476        tokio::spawn(async move {
477            let watch_dirs = camel_core::reload_watcher::resolve_watch_dirs(&watch_patterns);
478            let result = camel_core::reload_watcher::watch_and_reload(
479                watch_dirs,
480                ctrl,
481                move || {
482                    camel_dsl::discover_routes_with_threshold_and_security(
483                        &watch_patterns,
484                        camel_config.stream_caching.threshold,
485                        watch_security_compile_context.clone(),
486                    )
487                    .map_err(|e| camel_api::CamelError::RouteError(e.to_string()))
488                },
489                Some(watcher_token),
490                drain_timeout,
491                debounce,
492            )
493            .await;
494            if let Err(e) = result {
495                // log-policy: system-broken
496                tracing::error!("File watcher failed: {}", e);
497            }
498        });
499        tracing::info!(
500            "camel-cli: hot-reload watching {:?}. Press Ctrl+C to stop.",
501            patterns
502        );
503    } else {
504        tracing::info!("camel-cli: running (hot-reload disabled). Press Ctrl+C to stop.");
505    }
506
507    tokio::select! {
508        _ = tokio::signal::ctrl_c() => tracing::info!("Received Ctrl+C"),
509        _ = async {
510            #[cfg(unix)]
511            {
512                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
513                    .expect("Failed to install SIGTERM handler") // allow-unwrap
514                    .recv()
515                    .await
516            }
517            #[cfg(not(unix))]
518            {
519                std::future::pending::<()>().await
520            }
521        } => tracing::info!("Received SIGTERM"),
522    }
523
524    // Second Ctrl+C = force exit
525    let force_exit = tokio::spawn(async {
526        tokio::signal::ctrl_c().await.ok();
527        tracing::warn!("Second Ctrl+C — forcing exit");
528        std::process::exit(1);
529    });
530
531    tracing::info!("camel-cli: shutting down...");
532    watcher_shutdown.cancel();
533
534    // Signal pools to stop restarting BEFORE context shutdown
535    jms_pool.begin_shutdown();
536    cxf_pool.begin_shutdown();
537
538    // Stop context (routes + lifecycle services)
539    ctx.stop().await.unwrap_or_else(|e| {
540        // log-policy: system-broken
541        tracing::error!("Error during shutdown: {}", e);
542    });
543
544    // Tear down bridge pools with timeouts
545    const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
546
547    match tokio::time::timeout(SHUTDOWN_TIMEOUT, jms_pool.shutdown()).await {
548        Ok(Ok(())) => {}
549        Ok(Err(e)) => {
550            // log-policy: system-broken
551            tracing::error!("JMS pool shutdown failed: {}", e);
552        }
553        Err(_) => tracing::warn!("JMS pool shutdown timed out after 30s"),
554    }
555
556    match tokio::time::timeout(SHUTDOWN_TIMEOUT, cxf_pool.shutdown()).await {
557        Ok(Ok(())) => {}
558        Ok(Err(e)) => {
559            // log-policy: system-broken
560            tracing::error!("CXF pool shutdown failed: {}", e);
561        }
562        Err(_) => tracing::warn!("CXF pool shutdown timed out after 30s"),
563    }
564
565    force_exit.abort();
566
567    tracing::info!("camel-cli: stopped");
568    Ok(())
569}