Skip to main content

camel_config/
context_ext.rs

1use crate::config::{CamelConfig, OtelProtocol, OtelSampler};
2use crate::discovery::discover_routes;
3use camel_api::CamelError;
4use camel_core::CamelContext;
5use camel_core::OutputFormat;
6use camel_core::TracerConfig;
7use camel_core::route::RouteDefinition;
8use camel_otel::{
9    OtelConfig, OtelProtocol as OtelProtocolOtel, OtelSampler as OtelSamplerOtel, OtelService,
10};
11use tracing::Level;
12use tracing_subscriber::Layer;
13use tracing_subscriber::filter::filter_fn;
14use tracing_subscriber::fmt::format::FmtSpan;
15use tracing_subscriber::layer::SubscriberExt;
16use tracing_subscriber::util::SubscriberInitExt;
17
18#[cfg(feature = "http")]
19impl From<&crate::config::HttpCamelConfig> for camel_component_http::HttpConfig {
20    fn from(c: &crate::config::HttpCamelConfig) -> Self {
21        camel_component_http::HttpConfig {
22            connect_timeout_ms: c.connect_timeout_ms,
23            response_timeout_ms: c.response_timeout_ms,
24            max_connections: c.max_connections,
25            max_body_size: c.max_body_size,
26            max_request_body: c.max_request_body,
27            allow_private_ips: c.allow_private_ips,
28        }
29    }
30}
31
32#[cfg(feature = "kafka")]
33impl From<&crate::config::KafkaCamelConfig> for camel_component_kafka::KafkaConfig {
34    fn from(c: &crate::config::KafkaCamelConfig) -> Self {
35        camel_component_kafka::KafkaConfig {
36            brokers: c.brokers.clone(),
37            group_id: c.group_id.clone(),
38            session_timeout_ms: c.session_timeout_ms,
39            request_timeout_ms: c.request_timeout_ms,
40            auto_offset_reset: c.auto_offset_reset.clone(),
41            security_protocol: c.security_protocol.clone(),
42        }
43    }
44}
45
46#[cfg(feature = "redis")]
47impl From<&crate::config::RedisCamelConfig> for camel_component_redis::RedisConfig {
48    fn from(c: &crate::config::RedisCamelConfig) -> Self {
49        camel_component_redis::RedisConfig {
50            host: c.host.clone(),
51            port: c.port,
52        }
53    }
54}
55
56#[cfg(feature = "sql")]
57impl From<&crate::config::SqlCamelConfig> for camel_component_sql::SqlGlobalConfig {
58    fn from(c: &crate::config::SqlCamelConfig) -> Self {
59        camel_component_sql::SqlGlobalConfig::default()
60            .with_max_connections(c.max_connections)
61            .with_min_connections(c.min_connections)
62            .with_idle_timeout_secs(c.idle_timeout_secs)
63            .with_max_lifetime_secs(c.max_lifetime_secs)
64    }
65}
66
67#[cfg(feature = "file")]
68impl From<&crate::config::FileCamelConfig> for camel_component_file::FileGlobalConfig {
69    fn from(c: &crate::config::FileCamelConfig) -> Self {
70        camel_component_file::FileGlobalConfig::default()
71            .with_delay_ms(c.delay_ms)
72            .with_initial_delay_ms(c.initial_delay_ms)
73            .with_read_timeout_ms(c.read_timeout_ms)
74            .with_write_timeout_ms(c.write_timeout_ms)
75    }
76}
77
78#[cfg(feature = "container")]
79impl From<&crate::config::ContainerCamelConfig>
80    for camel_component_container::ContainerGlobalConfig
81{
82    fn from(c: &crate::config::ContainerCamelConfig) -> Self {
83        camel_component_container::ContainerGlobalConfig::default()
84            .with_docker_host(c.docker_host.clone())
85    }
86}
87
88impl CamelConfig {
89    /// Load routes from config file and return them (without adding to context yet)
90    /// This allows components to be registered before routes are resolved
91    pub fn load_routes(path: &str) -> Result<Vec<RouteDefinition>, CamelError> {
92        let config = Self::from_file_with_profile_and_env(path, None)
93            .map_err(|e| CamelError::Config(e.to_string()))?;
94
95        if config.routes.is_empty() {
96            return Ok(Vec::new());
97        }
98
99        discover_routes(&config.routes).map_err(|e| CamelError::Config(e.to_string()))
100    }
101
102    /// Create a CamelContext configured from this CamelConfig.
103    ///
104    /// Always installs a unified tracing subscriber (Layers 1–3, plus Layer 4
105    /// when OTel is enabled). `OtelService`, if present, only manages providers —
106    /// it never installs a subscriber.
107    pub fn configure_context(config: &CamelConfig) -> Result<CamelContext, CamelError> {
108        let otel_enabled = config
109            .observability
110            .otel
111            .as_ref()
112            .is_some_and(|o| o.enabled);
113
114        // Build context with optional supervision
115        let mut ctx = if let Some(ref sup) = config.supervision {
116            if let Some(path) = config.runtime_journal_path.as_deref() {
117                CamelContext::with_supervision_and_metrics_and_runtime_journal_path(
118                    sup.clone().into_supervision_config(),
119                    std::sync::Arc::new(camel_api::NoOpMetrics),
120                    path.to_string(),
121                )
122            } else {
123                CamelContext::with_supervision(sup.clone().into_supervision_config())
124            }
125        } else if let Some(path) = config.runtime_journal_path.as_deref() {
126            CamelContext::new_with_runtime_journal_path(path.to_string())
127        } else {
128            CamelContext::new()
129        };
130
131        ctx.set_shutdown_timeout(std::time::Duration::from_millis(config.timeout_ms));
132
133        let tracer_config = config.observability.tracer.clone();
134
135        // Always install the unified subscriber — OtelService no longer owns it
136        Self::init_tracing_subscriber(&tracer_config, &config.log_level, otel_enabled)?;
137
138        // OtelService manages providers only — subscriber is already installed above
139        if otel_enabled {
140            let otel_cfg = config.observability.otel.as_ref().unwrap();
141
142            let protocol = match otel_cfg.protocol {
143                OtelProtocol::Grpc => OtelProtocolOtel::Grpc,
144                OtelProtocol::Http => OtelProtocolOtel::HttpProtobuf,
145            };
146
147            let sampler = match &otel_cfg.sampler {
148                OtelSampler::AlwaysOn => OtelSamplerOtel::AlwaysOn,
149                OtelSampler::AlwaysOff => OtelSamplerOtel::AlwaysOff,
150                OtelSampler::Ratio => {
151                    let ratio = otel_cfg.sampler_ratio.unwrap_or(1.0).clamp(0.0, 1.0);
152                    OtelSamplerOtel::TraceIdRatioBased(ratio)
153                }
154            };
155
156            let mut otel_config = OtelConfig::new(&otel_cfg.endpoint, &otel_cfg.service_name)
157                .with_protocol(protocol)
158                .with_sampler(sampler)
159                .with_log_level(&otel_cfg.log_level)
160                .with_logs_enabled(otel_cfg.logs_enabled)
161                .with_metrics_interval_ms(otel_cfg.metrics_interval_ms);
162
163            for (key, value) in &otel_cfg.resource_attrs {
164                otel_config = otel_config.with_resource_attr(key, value);
165            }
166
167            let otel_service = OtelService::new(otel_config);
168            ctx = ctx.with_lifecycle(otel_service);
169        }
170
171        // Prometheus — replaces loose metrics_enabled / metrics_port
172        if let Some(ref prom) = config.observability.prometheus
173            && prom.enabled
174        {
175            let addr: std::net::SocketAddr = format!("{}:{}", prom.host, prom.port)
176                .parse()
177                .map_err(|_| {
178                    CamelError::Config(format!(
179                        "Invalid prometheus bind address: {}:{}",
180                        prom.host, prom.port
181                    ))
182                })?;
183            let prom_service = camel_prometheus::PrometheusService::new(addr);
184            ctx = ctx.with_lifecycle(prom_service);
185        }
186
187        #[cfg(feature = "http")]
188        {
189            let http_config: camel_component_http::HttpConfig = config
190                .components
191                .http
192                .as_ref()
193                .map(camel_component_http::HttpConfig::from)
194                .unwrap_or_default();
195            ctx.set_component_config(http_config);
196        }
197
198        #[cfg(feature = "kafka")]
199        {
200            let kafka_config: camel_component_kafka::KafkaConfig = config
201                .components
202                .kafka
203                .as_ref()
204                .map(camel_component_kafka::KafkaConfig::from)
205                .unwrap_or_default();
206            ctx.set_component_config(kafka_config);
207        }
208
209        #[cfg(feature = "redis")]
210        {
211            let redis_config: camel_component_redis::RedisConfig = config
212                .components
213                .redis
214                .as_ref()
215                .map(camel_component_redis::RedisConfig::from)
216                .unwrap_or_default();
217            ctx.set_component_config(redis_config);
218        }
219
220        #[cfg(feature = "sql")]
221        {
222            let sql_config: camel_component_sql::SqlGlobalConfig = config
223                .components
224                .sql
225                .as_ref()
226                .map(camel_component_sql::SqlGlobalConfig::from)
227                .unwrap_or_default();
228            ctx.set_component_config(sql_config);
229        }
230
231        #[cfg(feature = "file")]
232        {
233            let file_config: camel_component_file::FileGlobalConfig = config
234                .components
235                .file
236                .as_ref()
237                .map(camel_component_file::FileGlobalConfig::from)
238                .unwrap_or_default();
239            ctx.set_component_config(file_config);
240        }
241
242        #[cfg(feature = "container")]
243        {
244            let container_config: camel_component_container::ContainerGlobalConfig = config
245                .components
246                .container
247                .as_ref()
248                .map(camel_component_container::ContainerGlobalConfig::from)
249                .unwrap_or_default();
250            ctx.set_component_config(container_config);
251        }
252
253        ctx.set_tracer_config(tracer_config);
254        Ok(ctx)
255    }
256
257    fn init_tracing_subscriber(
258        config: &TracerConfig,
259        log_level: &str,
260        otel_active: bool,
261    ) -> Result<(), CamelError> {
262        let level = parse_log_level(log_level);
263
264        // Layer 1+2: general fmt layer — all log events, stdout, plaintext
265        let general_layer = tracing_subscriber::fmt::layer()
266            .with_writer(std::io::stdout)
267            .with_filter(tracing_subscriber::filter::LevelFilter::from_level(level))
268            .boxed();
269
270        // Layer 3a: camel_tracer stdout output (JSON or Plain)
271        let stdout_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> =
272            if config.enabled && config.outputs.stdout.enabled {
273                match config.outputs.stdout.format {
274                    OutputFormat::Json => Some(
275                        tracing_subscriber::fmt::layer()
276                            .json()
277                            .with_span_events(FmtSpan::CLOSE)
278                            .with_target(true)
279                            .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
280                            .boxed(),
281                    ),
282                    OutputFormat::Plain => Some(
283                        tracing_subscriber::fmt::layer()
284                            .with_span_events(FmtSpan::CLOSE)
285                            .with_target(true)
286                            .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
287                            .boxed(),
288                    ),
289                }
290            } else {
291                None
292            };
293
294        // Layer 3b: camel_tracer file output (JSON or Plain)
295        let file_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = if config
296            .enabled
297            && let Some(ref file_config) = config.outputs.file
298            && file_config.enabled
299        {
300            let file = std::fs::OpenOptions::new()
301                .create(true)
302                .append(true)
303                .open(&file_config.path)
304                .map_err(|e| {
305                    CamelError::Config(format!(
306                        "Failed to open trace file '{}': {}",
307                        file_config.path, e
308                    ))
309                })?;
310
311            match file_config.format {
312                OutputFormat::Json => Some(
313                    tracing_subscriber::fmt::layer()
314                        .json()
315                        .with_span_events(FmtSpan::CLOSE)
316                        .with_writer(std::sync::Mutex::new(file))
317                        .with_target(true)
318                        .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
319                        .boxed(),
320                ),
321                OutputFormat::Plain => Some(
322                    tracing_subscriber::fmt::layer()
323                        .with_span_events(FmtSpan::CLOSE)
324                        .with_writer(std::sync::Mutex::new(file))
325                        .with_target(true)
326                        .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
327                        .boxed(),
328                ),
329            }
330        } else {
331            None
332        };
333
334        // Layer 4: tracing-opentelemetry bridge — only when OTel is active
335        #[cfg(feature = "otel")]
336        let otel_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = if otel_active
337        {
338            Some(
339                tracing_opentelemetry::layer()
340                    .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
341                    .boxed(),
342            )
343        } else {
344            None
345        };
346        #[cfg(not(feature = "otel"))]
347        let _ = otel_active; // suppress unused variable warning
348
349        let mut layers: Vec<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = Vec::new();
350        layers.push(general_layer);
351        if let Some(l) = stdout_layer {
352            layers.push(l);
353        }
354        if let Some(l) = file_layer {
355            layers.push(l);
356        }
357        #[cfg(feature = "otel")]
358        if let Some(l) = otel_layer {
359            layers.push(l);
360        }
361
362        // try_init() silently ignores "already set" error (expected in tests)
363        let _ = tracing_subscriber::registry().with(layers).try_init();
364
365        Ok(())
366    }
367}
368
369/// Parse a log level string, defaulting to INFO on failure.
370fn parse_log_level(s: &str) -> Level {
371    match s.to_lowercase().as_str() {
372        "trace" => Level::TRACE,
373        "debug" => Level::DEBUG,
374        "info" => Level::INFO,
375        "warn" | "warning" => Level::WARN,
376        "error" => Level::ERROR,
377        _ => Level::INFO,
378    }
379}
380
381#[cfg(test)]
382mod configure_context_smoke_tests {
383    use super::*;
384    use config::FileFormat;
385
386    #[test]
387    fn test_configure_context_empty_config() {
388        let cfg = config::Config::builder()
389            .add_source(config::File::from_str("", FileFormat::Toml))
390            .build()
391            .unwrap()
392            .try_deserialize::<CamelConfig>()
393            .unwrap();
394        // configure_context compiles and runs without error on empty config
395        let result = CamelConfig::configure_context(&cfg);
396        assert!(result.is_ok());
397    }
398}