Skip to main content

camel_cli/commands/
run.rs

1//! `camel run` subcommand body.
2//!
3//! Owns the 7 ADR-0012 `error!` sites migrated in Phase C (see ADR-0012 +
4//! Phase C plan). Each site has a `// log-policy: …` annotation that
5//! classifies it per the ADR taxonomy.
6//!
7//! All sites in this file are category (c) or (d) — system-broken /
8//! bootstrap. The annotations are added in the Infra cluster task (Task 9),
9//! NOT in this split commit.
10
11use camel_bean::BeanProcessor;
12use camel_language_js::JsLanguage;
13use camel_language_jsonpath::JsonPathLanguage;
14use camel_language_rhai::RhaiLanguage;
15use camel_language_xpath::XPathLanguage;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio_util::sync::CancellationToken;
19
20struct BridgeCleanup {
21    xslt: Arc<camel_xslt::XsltBridgeRuntime>,
22    xj: Arc<camel_xj::XjBridgeRuntime>,
23    validator: Option<Arc<camel_component_validator::xsd_bridge::XsdBridgeBackend>>,
24}
25
26#[async_trait::async_trait]
27impl camel_api::lifecycle::Lifecycle for BridgeCleanup {
28    fn name(&self) -> &str {
29        "bridge-cleanup"
30    }
31
32    async fn start(&mut self) -> Result<(), camel_api::CamelError> {
33        Ok(())
34    }
35
36    async fn stop(&mut self) -> Result<(), camel_api::CamelError> {
37        self.xslt.shutdown().await;
38        self.xj.shutdown().await;
39        if let Some(validator) = &self.validator {
40            validator.shutdown().await;
41        }
42        Ok(())
43    }
44}
45
46pub async fn run(
47    routes_override: Option<String>,
48    config_path: String,
49    cli_watch: Option<bool>,
50    otel: bool,
51    otel_endpoint: Option<String>,
52    service_name: Option<String>,
53    health_port: Option<u16>,
54) -> Result<(), camel_api::CamelError> {
55    // 1. Load config (fall back to empty config with serde defaults if Camel.toml not found)
56    let mut camel_config: camel_config::config::CamelConfig =
57        camel_config::config::CamelConfig::from_file(&config_path).unwrap_or_else(|_| {
58            // Build an empty config so serde defaults apply
59            config::Config::builder()
60                .build()
61                .and_then(|c| c.try_deserialize())
62                .unwrap_or_else(|e| {
63                    eprintln!("Failed to build default config: {e}");
64                    std::process::exit(1);
65                })
66        });
67
68    // 1b. Apply OTel CLI overrides (--otel-endpoint and --service-name imply --otel)
69    let otel_enabled = otel || otel_endpoint.is_some() || service_name.is_some();
70    if otel_enabled {
71        let otel_cfg =
72            camel_config
73                .observability
74                .otel
75                .get_or_insert(camel_config::OtelCamelConfig {
76                    enabled: true,
77                    endpoint: "http://localhost:4317".to_string(),
78                    service_name: "rust-camel".to_string(),
79                    ..Default::default()
80                });
81        otel_cfg.enabled = true;
82        if let Some(ep) = otel_endpoint {
83            otel_cfg.endpoint = ep;
84        }
85        if let Some(name) = service_name {
86            otel_cfg.service_name = name;
87        }
88    }
89
90    if let Some(port) = health_port {
91        let health_cfg = camel_config
92            .observability
93            .health
94            .get_or_insert(camel_config::config::HealthCamelConfig::default());
95        health_cfg.enabled = true;
96        health_cfg.port = port;
97    }
98
99    // 2. Build context with beans registry (also initialises tracing subscriber)
100    let beans_registry = {
101        let bean_reg = std::sync::Arc::new(std::sync::Mutex::new(camel_bean::BeanRegistry::new()));
102        if camel_config.beans.is_empty() {
103            None
104        } else {
105            Some(bean_reg)
106        }
107    };
108
109    let mut ctx = camel_config::config::CamelConfig::configure_context_with_beans(
110        &camel_config,
111        beans_registry.clone(),
112    )
113    .await
114    .unwrap_or_else(|e| {
115        eprintln!("Failed to configure CamelContext: {e}");
116        std::process::exit(1);
117    });
118
119    match camel_function::FunctionRuntimeService::with_default_container_provider(
120        camel_function::FunctionConfig::default(),
121    ) {
122        Ok(svc) => ctx = ctx.with_lifecycle(svc),
123        Err(e) => tracing::warn!("Function runtime disabled: {e}"),
124    }
125
126    // Load WASM beans after context is created (needs component registry)
127    #[cfg(feature = "wasm")]
128    if let Some(ref bean_reg) = beans_registry {
129        let component_registry = ctx.registry_arc();
130        let plugins_dir_raw = camel_config
131            .components
132            .raw
133            .get("wasm")
134            .and_then(|v| v.get("plugins_dir"))
135            .and_then(|v| v.as_str())
136            .unwrap_or("plugins");
137        let config_dir = std::path::Path::new(&config_path)
138            .parent()
139            .map(|p| {
140                if p.as_os_str().is_empty() {
141                    std::path::Path::new(".")
142                } else {
143                    p
144                }
145            })
146            .unwrap_or(std::path::Path::new("."));
147        let camel_root = config_dir.canonicalize().unwrap_or_else(|e| {
148            eprintln!("Error: cannot resolve project root: {e}");
149            std::process::exit(1);
150        });
151        crate::commands::plugin::validate_plugins_dir(&camel_root, plugins_dir_raw).unwrap_or_else(
152            |e| {
153                eprintln!("Error: invalid plugins_dir: {e}");
154                std::process::exit(1);
155            },
156        );
157        let plugins_dir = camel_root.join(plugins_dir_raw);
158        for (bean_name, bean_cfg) in &camel_config.beans {
159            tracing::info!(bean = %bean_name, plugin = %bean_cfg.plugin, "registering WASM bean");
160
161            if !bean_cfg
162                .plugin
163                .chars()
164                .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
165            {
166                eprintln!(
167                    "Invalid bean plugin name '{}': must be alphanumeric with - or _",
168                    bean_cfg.plugin
169                );
170                std::process::exit(1);
171            }
172
173            let wasm_path = plugins_dir.join(format!("{}.wasm", bean_cfg.plugin));
174            let canonical_plugins = plugins_dir.canonicalize().unwrap_or_else(|_| {
175                eprintln!("Plugins directory not found: {}", plugins_dir.display());
176                std::process::exit(1);
177            });
178            let canonical_path = wasm_path.canonicalize().unwrap_or_else(|_| {
179                eprintln!("WASM bean plugin not found: {}", wasm_path.display());
180                std::process::exit(1);
181            });
182            if !canonical_path.starts_with(&canonical_plugins) {
183                eprintln!(
184                    "Bean plugin path escapes plugins directory: {}",
185                    bean_cfg.plugin
186                );
187                std::process::exit(1);
188            }
189            let wasm_config =
190                camel_component_wasm::config::WasmConfig::from_limits(&bean_cfg.limits);
191            let wasm_bean = camel_component_wasm::bean::WasmBean::new(
192                &wasm_path,
193                wasm_config,
194                component_registry.clone(),
195                bean_cfg.config.clone(),
196            )
197            .await
198            .unwrap_or_else(|e| {
199                eprintln!("Failed to load WASM bean '{}': {}", bean_name, e);
200                std::process::exit(1);
201            });
202            tracing::info!(
203                bean = %bean_name,
204                plugin = %bean_cfg.plugin,
205                methods = ?wasm_bean.methods(),
206                "WASM bean loaded"
207            );
208            bean_reg
209                .lock()
210                .expect("beans registry lock") // allow-unwrap
211                .register(bean_name, wasm_bean)
212                .unwrap_or_else(|e| {
213                    eprintln!("Bean registration failed for '{}': {}", bean_name, e);
214                    std::process::exit(1);
215                });
216        }
217    }
218
219    // 3. Determine route patterns
220    let patterns: Vec<String> = if let Some(p) = routes_override {
221        vec![p]
222    } else if !camel_config.routes.is_empty() {
223        camel_config.routes.clone()
224    } else {
225        vec!["routes/*.yaml".to_string()]
226    };
227
228    tracing::info!("camel-cli: loading routes from patterns: {:?}", patterns);
229
230    let security_compile_context =
231        crate::build_security_compile_context_from_config(&camel_config, ctx.registry_arc())
232            .await?;
233
234    // Define register_bundle! macro — looks up config key in ComponentsConfig::raw,
235    // falling back to an empty table so bundles always register with their serde defaults.
236    // Uses UFCS to invoke ComponentBundle methods without requiring trait in scope
237    macro_rules! register_bundle {
238        ($ctx:expr, $cfg:expr, $Bundle:ty) => {
239            let raw = $cfg
240                .components
241                .raw
242                .get(<$Bundle as camel_component_api::ComponentBundle>::config_key())
243                .cloned()
244                .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
245            match <$Bundle as camel_component_api::ComponentBundle>::from_toml(raw) {
246                Ok(bundle) => <$Bundle as camel_component_api::ComponentBundle>::register_all(
247                    bundle, &mut $ctx,
248                ),
249                Err(e) => {
250                    return Err(camel_api::CamelError::Config(format!(
251                        "Failed to load {} config: {}",
252                        <$Bundle as camel_component_api::ComponentBundle>::config_key(),
253                        e
254                    )));
255                }
256            }
257        };
258    }
259
260    // Register built-in components (no config needed)
261    ctx.register_component(camel_component_timer::TimerComponent::new());
262    ctx.register_component(camel_component_log::LogComponent::new());
263    ctx.register_component(camel_component_direct::DirectComponent::new());
264    ctx.register_component(camel_component_seda::SedaComponent::new());
265    ctx.register_component(camel_component_mock::MockComponent::new());
266    ctx.register_component(camel_component_controlbus::ControlBusComponent::new());
267    let validator_component = camel_component_validator::ValidatorComponent::new();
268    let validator_backend = validator_component.xsd_bridge_backend();
269    ctx.register_component(validator_component);
270
271    let xslt_component = camel_xslt::XsltComponent::default();
272    let xslt_runtime = xslt_component.bridge_runtime();
273    ctx.register_component(xslt_component);
274
275    let xj_component = camel_xj::XjComponent::default();
276    let xj_runtime = xj_component.bridge_runtime();
277    ctx.register_component(xj_component);
278
279    ctx = ctx.with_lifecycle(BridgeCleanup {
280        xslt: xslt_runtime,
281        xj: xj_runtime,
282        validator: validator_backend,
283    });
284
285    // Register HTTP, WS, File, Container (always-on in camel-cli, no feature flag)
286    register_bundle!(ctx, camel_config, camel_component_http::HttpBundle);
287    #[cfg(feature = "http-static")]
288    register_bundle!(ctx, camel_config, camel_component_http::HttpStaticBundle);
289    register_bundle!(ctx, camel_config, camel_component_ws::WsBundle);
290    register_bundle!(ctx, camel_config, camel_component_file::FileBundle);
291    register_bundle!(
292        ctx,
293        camel_config,
294        camel_component_container::ContainerBundle
295    );
296
297    // Register optional/feature-gated bundles
298    let jms_pool = {
299        let raw = camel_config
300            .components
301            .raw
302            .get("jms")
303            .cloned()
304            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
305        match <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::from_toml(
306            raw,
307        ) {
308            Ok(bundle) => {
309                let pool = bundle.pool();
310                <camel_component_jms::JmsBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
311                pool
312            }
313            Err(e) => {
314                return Err(camel_api::CamelError::Config(format!(
315                    "Failed to load jms config: {e}"
316                )));
317            }
318        }
319    };
320
321    let cxf_pool = {
322        let raw = camel_config
323            .components
324            .raw
325            .get("cxf")
326            .cloned()
327            .unwrap_or_else(|| toml::Value::Table(toml::map::Map::new()));
328        match <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::from_toml(
329            raw,
330        ) {
331            Ok(bundle) => {
332                let pool = bundle.pool();
333                <camel_component_cxf::CxfBundle as camel_component_api::ComponentBundle>::register_all(bundle, &mut ctx);
334                pool
335            }
336            Err(e) => {
337                return Err(camel_api::CamelError::Config(format!(
338                    "Failed to load cxf config: {e}"
339                )));
340            }
341        }
342    };
343
344    #[cfg(feature = "kafka")]
345    register_bundle!(ctx, camel_config, camel_component_kafka::KafkaBundle);
346    register_bundle!(ctx, camel_config, camel_master::MasterBundle);
347    register_bundle!(
348        ctx,
349        camel_config,
350        camel_component_opensearch::OpenSearchBundle
351    );
352    register_bundle!(ctx, camel_config, camel_component_redis::RedisBundle);
353    register_bundle!(ctx, camel_config, camel_component_sql::SqlBundle);
354    #[cfg(feature = "grpc")]
355    register_bundle!(ctx, camel_config, camel_component_grpc::GrpcBundle);
356
357    #[cfg(feature = "wasm")]
358    {
359        let base_dir = std::path::Path::new(&config_path)
360            .parent()
361            .unwrap_or(std::path::Path::new("."))
362            .to_path_buf();
363        let wasm_bundle = camel_component_wasm::WasmBundle::new(ctx.registry_arc(), base_dir);
364        <camel_component_wasm::WasmBundle as camel_component_api::ComponentBundle>::register_all(
365            wasm_bundle,
366            &mut ctx,
367        );
368    }
369
370    // Register language plugins bundled in camel-cli.
371    // These languages are optional in core, so the CLI wires them explicitly.
372    ctx.register_language("js", Box::new(JsLanguage::new()))?;
373    ctx.register_language("javascript", Box::new(JsLanguage::new()))?;
374    ctx.register_language("rhai", Box::new(RhaiLanguage::new()))?;
375    ctx.register_language("jsonpath", Box::new(JsonPathLanguage::new()))?;
376    ctx.register_language("xpath", Box::new(XPathLanguage::new()))?;
377
378    // 5. Discover and load initial routes
379    match camel_dsl::discover_routes_with_threshold_and_security(
380        &patterns,
381        camel_config.stream_caching.threshold,
382        security_compile_context.clone(),
383    ) {
384        Ok(defs) => {
385            for def in defs {
386                let id = def.route_id().to_string();
387                if let Err(e) = ctx.add_route_definition(def).await {
388                    // log-policy: system-broken
389                    tracing::error!("Failed to add route '{}': {}", id, e);
390                }
391            }
392        }
393        Err(e) => {
394            // log-policy: system-broken
395            tracing::error!("Failed to discover routes: {}", e);
396            std::process::exit(1);
397        }
398    }
399
400    // 6. Start context
401    if let Err(e) = ctx.start().await {
402        // log-policy: system-broken
403        tracing::error!("Failed to start CamelContext: {}", e);
404        std::process::exit(1);
405    }
406
407    tracing::info!("camel-cli: context started");
408
409    // 7. Resolve whether to enable the file watcher:
410    //    CLI flag takes precedence; falls back to Camel.toml `watch` field (default: false).
411    let watch_enabled = cli_watch.unwrap_or(camel_config.watch);
412
413    // 8. Optionally start file watcher in background
414    let watcher_shutdown = CancellationToken::new();
415    if watch_enabled {
416        let ctrl = ctx.runtime_execution_handle();
417        let watch_patterns = patterns.clone();
418        let watch_security_compile_context = security_compile_context.clone();
419        let drain_timeout = std::time::Duration::from_millis(camel_config.drain_timeout_ms);
420        let debounce = std::time::Duration::from_millis(camel_config.watch_debounce_ms);
421        let watcher_token = watcher_shutdown.clone();
422        tokio::spawn(async move {
423            let watch_dirs = camel_core::reload_watcher::resolve_watch_dirs(&watch_patterns);
424            let result = camel_core::reload_watcher::watch_and_reload(
425                watch_dirs,
426                ctrl,
427                move || {
428                    camel_dsl::discover_routes_with_threshold_and_security(
429                        &watch_patterns,
430                        camel_config.stream_caching.threshold,
431                        watch_security_compile_context.clone(),
432                    )
433                    .map_err(|e| camel_api::CamelError::RouteError(e.to_string()))
434                },
435                Some(watcher_token),
436                drain_timeout,
437                debounce,
438            )
439            .await;
440            if let Err(e) = result {
441                // log-policy: system-broken
442                tracing::error!("File watcher failed: {}", e);
443            }
444        });
445        tracing::info!(
446            "camel-cli: hot-reload watching {:?}. Press Ctrl+C to stop.",
447            patterns
448        );
449    } else {
450        tracing::info!("camel-cli: running (hot-reload disabled). Press Ctrl+C to stop.");
451    }
452
453    tokio::select! {
454        _ = tokio::signal::ctrl_c() => tracing::info!("Received Ctrl+C"),
455        _ = async {
456            #[cfg(unix)]
457            {
458                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
459                    .expect("Failed to install SIGTERM handler") // allow-unwrap
460                    .recv()
461                    .await
462            }
463            #[cfg(not(unix))]
464            {
465                std::future::pending::<()>().await
466            }
467        } => tracing::info!("Received SIGTERM"),
468    }
469
470    // Second Ctrl+C = force exit
471    let force_exit = tokio::spawn(async {
472        tokio::signal::ctrl_c().await.ok();
473        tracing::warn!("Second Ctrl+C — forcing exit");
474        std::process::exit(1);
475    });
476
477    tracing::info!("camel-cli: shutting down...");
478    watcher_shutdown.cancel();
479
480    // Signal pools to stop restarting BEFORE context shutdown
481    jms_pool.begin_shutdown();
482    cxf_pool.begin_shutdown();
483
484    // Stop context (routes + lifecycle services)
485    ctx.stop().await.unwrap_or_else(|e| {
486        // log-policy: system-broken
487        tracing::error!("Error during shutdown: {}", e);
488    });
489
490    // Tear down bridge pools with timeouts
491    const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
492
493    match tokio::time::timeout(SHUTDOWN_TIMEOUT, jms_pool.shutdown()).await {
494        Ok(Ok(())) => {}
495        Ok(Err(e)) => {
496            // log-policy: system-broken
497            tracing::error!("JMS pool shutdown failed: {}", e);
498        }
499        Err(_) => tracing::warn!("JMS pool shutdown timed out after 30s"),
500    }
501
502    match tokio::time::timeout(SHUTDOWN_TIMEOUT, cxf_pool.shutdown()).await {
503        Ok(Ok(())) => {}
504        Ok(Err(e)) => {
505            // log-policy: system-broken
506            tracing::error!("CXF pool shutdown failed: {}", e);
507        }
508        Err(_) => tracing::warn!("CXF pool shutdown timed out after 30s"),
509    }
510
511    force_exit.abort();
512
513    tracing::info!("camel-cli: stopped");
514    Ok(())
515}