camel_config/
context_ext.rs1use crate::config::{CamelConfig, OtelProtocol, OtelSampler};
2use crate::discovery::discover_routes;
3use camel_api::CamelError;
4use camel_core::CamelContext;
5use camel_core::OutputFormat;
6use camel_core::TracerConfig;
7use camel_core::route::RouteDefinition;
8use camel_otel::{
9 OtelConfig, OtelProtocol as OtelProtocolOtel, OtelSampler as OtelSamplerOtel, OtelService,
10};
11use tracing::Level;
12use tracing_subscriber::Layer;
13use tracing_subscriber::filter::filter_fn;
14use tracing_subscriber::fmt::format::FmtSpan;
15use tracing_subscriber::layer::SubscriberExt;
16use tracing_subscriber::util::SubscriberInitExt;
17
18#[cfg(feature = "http")]
19impl From<&crate::config::HttpCamelConfig> for camel_component_http::HttpConfig {
20 fn from(c: &crate::config::HttpCamelConfig) -> Self {
21 camel_component_http::HttpConfig {
22 connect_timeout_ms: c.connect_timeout_ms,
23 response_timeout_ms: c.response_timeout_ms,
24 max_connections: c.max_connections,
25 max_body_size: c.max_body_size,
26 max_request_body: c.max_request_body,
27 allow_private_ips: c.allow_private_ips,
28 }
29 }
30}
31
32#[cfg(feature = "kafka")]
33impl From<&crate::config::KafkaCamelConfig> for camel_component_kafka::KafkaConfig {
34 fn from(c: &crate::config::KafkaCamelConfig) -> Self {
35 camel_component_kafka::KafkaConfig {
36 brokers: c.brokers.clone(),
37 group_id: c.group_id.clone(),
38 session_timeout_ms: c.session_timeout_ms,
39 request_timeout_ms: c.request_timeout_ms,
40 auto_offset_reset: c.auto_offset_reset.clone(),
41 security_protocol: c.security_protocol.clone(),
42 }
43 }
44}
45
46#[cfg(feature = "redis")]
47impl From<&crate::config::RedisCamelConfig> for camel_component_redis::RedisConfig {
48 fn from(c: &crate::config::RedisCamelConfig) -> Self {
49 camel_component_redis::RedisConfig {
50 host: c.host.clone(),
51 port: c.port,
52 }
53 }
54}
55
56#[cfg(feature = "sql")]
57impl From<&crate::config::SqlCamelConfig> for camel_component_sql::SqlGlobalConfig {
58 fn from(c: &crate::config::SqlCamelConfig) -> Self {
59 camel_component_sql::SqlGlobalConfig::default()
60 .with_max_connections(c.max_connections)
61 .with_min_connections(c.min_connections)
62 .with_idle_timeout_secs(c.idle_timeout_secs)
63 .with_max_lifetime_secs(c.max_lifetime_secs)
64 }
65}
66
67#[cfg(feature = "file")]
68impl From<&crate::config::FileCamelConfig> for camel_component_file::FileGlobalConfig {
69 fn from(c: &crate::config::FileCamelConfig) -> Self {
70 camel_component_file::FileGlobalConfig::default()
71 .with_delay_ms(c.delay_ms)
72 .with_initial_delay_ms(c.initial_delay_ms)
73 .with_read_timeout_ms(c.read_timeout_ms)
74 .with_write_timeout_ms(c.write_timeout_ms)
75 }
76}
77
78#[cfg(feature = "container")]
79impl From<&crate::config::ContainerCamelConfig>
80 for camel_component_container::ContainerGlobalConfig
81{
82 fn from(c: &crate::config::ContainerCamelConfig) -> Self {
83 camel_component_container::ContainerGlobalConfig::default()
84 .with_docker_host(c.docker_host.clone())
85 }
86}
87
88impl CamelConfig {
89 pub fn load_routes(path: &str) -> Result<Vec<RouteDefinition>, CamelError> {
92 let config = Self::from_file_with_profile_and_env(path, None)
93 .map_err(|e| CamelError::Config(e.to_string()))?;
94
95 if config.routes.is_empty() {
96 return Ok(Vec::new());
97 }
98
99 discover_routes(&config.routes).map_err(|e| CamelError::Config(e.to_string()))
100 }
101
102 pub fn configure_context(config: &CamelConfig) -> Result<CamelContext, CamelError> {
108 let otel_enabled = config
109 .observability
110 .otel
111 .as_ref()
112 .is_some_and(|o| o.enabled);
113
114 let mut ctx = if let Some(ref sup) = config.supervision {
116 if let Some(path) = config.runtime_journal_path.as_deref() {
117 CamelContext::with_supervision_and_metrics_and_runtime_journal_path(
118 sup.clone().into_supervision_config(),
119 std::sync::Arc::new(camel_api::NoOpMetrics),
120 path.to_string(),
121 )
122 } else {
123 CamelContext::with_supervision(sup.clone().into_supervision_config())
124 }
125 } else if let Some(path) = config.runtime_journal_path.as_deref() {
126 CamelContext::new_with_runtime_journal_path(path.to_string())
127 } else {
128 CamelContext::new()
129 };
130
131 ctx.set_shutdown_timeout(std::time::Duration::from_millis(config.timeout_ms));
132
133 let tracer_config = config.observability.tracer.clone();
134
135 Self::init_tracing_subscriber(&tracer_config, &config.log_level, otel_enabled)?;
137
138 if otel_enabled {
140 let otel_cfg = config.observability.otel.as_ref().unwrap();
141
142 let protocol = match otel_cfg.protocol {
143 OtelProtocol::Grpc => OtelProtocolOtel::Grpc,
144 OtelProtocol::Http => OtelProtocolOtel::HttpProtobuf,
145 };
146
147 let sampler = match &otel_cfg.sampler {
148 OtelSampler::AlwaysOn => OtelSamplerOtel::AlwaysOn,
149 OtelSampler::AlwaysOff => OtelSamplerOtel::AlwaysOff,
150 OtelSampler::Ratio => {
151 let ratio = otel_cfg.sampler_ratio.unwrap_or(1.0).clamp(0.0, 1.0);
152 OtelSamplerOtel::TraceIdRatioBased(ratio)
153 }
154 };
155
156 let mut otel_config = OtelConfig::new(&otel_cfg.endpoint, &otel_cfg.service_name)
157 .with_protocol(protocol)
158 .with_sampler(sampler)
159 .with_log_level(&otel_cfg.log_level)
160 .with_logs_enabled(otel_cfg.logs_enabled)
161 .with_metrics_interval_ms(otel_cfg.metrics_interval_ms);
162
163 for (key, value) in &otel_cfg.resource_attrs {
164 otel_config = otel_config.with_resource_attr(key, value);
165 }
166
167 let otel_service = OtelService::new(otel_config);
168 ctx = ctx.with_lifecycle(otel_service);
169 }
170
171 if let Some(ref prom) = config.observability.prometheus
173 && prom.enabled
174 {
175 let addr: std::net::SocketAddr = format!("{}:{}", prom.host, prom.port)
176 .parse()
177 .map_err(|_| {
178 CamelError::Config(format!(
179 "Invalid prometheus bind address: {}:{}",
180 prom.host, prom.port
181 ))
182 })?;
183 let prom_service = camel_prometheus::PrometheusService::new(addr);
184 ctx = ctx.with_lifecycle(prom_service);
185 }
186
187 #[cfg(feature = "http")]
188 {
189 let http_config: camel_component_http::HttpConfig = config
190 .components
191 .http
192 .as_ref()
193 .map(camel_component_http::HttpConfig::from)
194 .unwrap_or_default();
195 ctx.set_component_config(http_config);
196 }
197
198 #[cfg(feature = "kafka")]
199 {
200 let kafka_config: camel_component_kafka::KafkaConfig = config
201 .components
202 .kafka
203 .as_ref()
204 .map(camel_component_kafka::KafkaConfig::from)
205 .unwrap_or_default();
206 ctx.set_component_config(kafka_config);
207 }
208
209 #[cfg(feature = "redis")]
210 {
211 let redis_config: camel_component_redis::RedisConfig = config
212 .components
213 .redis
214 .as_ref()
215 .map(camel_component_redis::RedisConfig::from)
216 .unwrap_or_default();
217 ctx.set_component_config(redis_config);
218 }
219
220 #[cfg(feature = "sql")]
221 {
222 let sql_config: camel_component_sql::SqlGlobalConfig = config
223 .components
224 .sql
225 .as_ref()
226 .map(camel_component_sql::SqlGlobalConfig::from)
227 .unwrap_or_default();
228 ctx.set_component_config(sql_config);
229 }
230
231 #[cfg(feature = "file")]
232 {
233 let file_config: camel_component_file::FileGlobalConfig = config
234 .components
235 .file
236 .as_ref()
237 .map(camel_component_file::FileGlobalConfig::from)
238 .unwrap_or_default();
239 ctx.set_component_config(file_config);
240 }
241
242 #[cfg(feature = "container")]
243 {
244 let container_config: camel_component_container::ContainerGlobalConfig = config
245 .components
246 .container
247 .as_ref()
248 .map(camel_component_container::ContainerGlobalConfig::from)
249 .unwrap_or_default();
250 ctx.set_component_config(container_config);
251 }
252
253 ctx.set_tracer_config(tracer_config);
254 Ok(ctx)
255 }
256
257 fn init_tracing_subscriber(
258 config: &TracerConfig,
259 log_level: &str,
260 otel_active: bool,
261 ) -> Result<(), CamelError> {
262 let level = parse_log_level(log_level);
263
264 let general_layer = tracing_subscriber::fmt::layer()
266 .with_writer(std::io::stdout)
267 .with_filter(tracing_subscriber::filter::LevelFilter::from_level(level))
268 .boxed();
269
270 let stdout_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> =
272 if config.enabled && config.outputs.stdout.enabled {
273 match config.outputs.stdout.format {
274 OutputFormat::Json => Some(
275 tracing_subscriber::fmt::layer()
276 .json()
277 .with_span_events(FmtSpan::CLOSE)
278 .with_target(true)
279 .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
280 .boxed(),
281 ),
282 OutputFormat::Plain => Some(
283 tracing_subscriber::fmt::layer()
284 .with_span_events(FmtSpan::CLOSE)
285 .with_target(true)
286 .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
287 .boxed(),
288 ),
289 }
290 } else {
291 None
292 };
293
294 let file_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = if config
296 .enabled
297 && let Some(ref file_config) = config.outputs.file
298 && file_config.enabled
299 {
300 let file = std::fs::OpenOptions::new()
301 .create(true)
302 .append(true)
303 .open(&file_config.path)
304 .map_err(|e| {
305 CamelError::Config(format!(
306 "Failed to open trace file '{}': {}",
307 file_config.path, e
308 ))
309 })?;
310
311 match file_config.format {
312 OutputFormat::Json => Some(
313 tracing_subscriber::fmt::layer()
314 .json()
315 .with_span_events(FmtSpan::CLOSE)
316 .with_writer(std::sync::Mutex::new(file))
317 .with_target(true)
318 .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
319 .boxed(),
320 ),
321 OutputFormat::Plain => Some(
322 tracing_subscriber::fmt::layer()
323 .with_span_events(FmtSpan::CLOSE)
324 .with_writer(std::sync::Mutex::new(file))
325 .with_target(true)
326 .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
327 .boxed(),
328 ),
329 }
330 } else {
331 None
332 };
333
334 #[cfg(feature = "otel")]
336 let otel_layer: Option<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = if otel_active
337 {
338 Some(
339 tracing_opentelemetry::layer()
340 .with_filter(filter_fn(|meta| meta.target() == "camel_tracer"))
341 .boxed(),
342 )
343 } else {
344 None
345 };
346 #[cfg(not(feature = "otel"))]
347 let _ = otel_active; let mut layers: Vec<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = Vec::new();
350 layers.push(general_layer);
351 if let Some(l) = stdout_layer {
352 layers.push(l);
353 }
354 if let Some(l) = file_layer {
355 layers.push(l);
356 }
357 #[cfg(feature = "otel")]
358 if let Some(l) = otel_layer {
359 layers.push(l);
360 }
361
362 let _ = tracing_subscriber::registry().with(layers).try_init();
364
365 Ok(())
366 }
367}
368
369fn parse_log_level(s: &str) -> Level {
371 match s.to_lowercase().as_str() {
372 "trace" => Level::TRACE,
373 "debug" => Level::DEBUG,
374 "info" => Level::INFO,
375 "warn" | "warning" => Level::WARN,
376 "error" => Level::ERROR,
377 _ => Level::INFO,
378 }
379}
380
381#[cfg(test)]
382mod configure_context_smoke_tests {
383 use super::*;
384 use config::FileFormat;
385
386 #[test]
387 fn test_configure_context_empty_config() {
388 let cfg = config::Config::builder()
389 .add_source(config::File::from_str("", FileFormat::Toml))
390 .build()
391 .unwrap()
392 .try_deserialize::<CamelConfig>()
393 .unwrap();
394 let result = CamelConfig::configure_context(&cfg);
396 assert!(result.is_ok());
397 }
398}