use camel_core::TracerConfig;
use config::{Config, ConfigError};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::time::Duration;
#[derive(Debug, Clone, Deserialize)]
pub struct CamelConfig {
#[serde(default)]
pub routes: Vec<String>,
#[serde(default)]
pub watch: bool,
#[serde(default)]
pub runtime_journal: Option<JournalConfig>,
#[serde(default = "default_log_level")]
pub log_level: String,
#[serde(default = "default_timeout_ms")]
pub timeout_ms: u64,
#[serde(default = "default_drain_timeout_ms")]
pub drain_timeout_ms: u64,
#[serde(default = "default_watch_debounce_ms")]
pub watch_debounce_ms: u64,
#[serde(default)]
pub components: ComponentsConfig,
#[serde(default)]
pub observability: ObservabilityConfig,
#[serde(default)]
pub supervision: Option<SupervisionCamelConfig>,
#[serde(default)]
pub platform: PlatformCamelConfig,
#[serde(default)]
pub stream_caching: StreamCachingConfig,
}
#[derive(Debug, Clone, Deserialize, Default, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum PlatformCamelConfig {
#[default]
Noop,
Kubernetes(KubernetesPlatformCamelConfig),
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct KubernetesPlatformCamelConfig {
#[serde(default)]
pub namespace: Option<String>,
#[serde(default = "default_lease_name_prefix")]
pub lease_name_prefix: String,
#[serde(default = "default_lease_duration_secs")]
pub lease_duration_secs: u64,
#[serde(default = "default_renew_deadline_secs")]
pub renew_deadline_secs: u64,
#[serde(default = "default_retry_period_secs")]
pub retry_period_secs: u64,
#[serde(default = "default_kubernetes_jitter_factor")]
pub jitter_factor: f64,
}
impl Default for KubernetesPlatformCamelConfig {
fn default() -> Self {
Self {
namespace: None,
lease_name_prefix: default_lease_name_prefix(),
lease_duration_secs: default_lease_duration_secs(),
renew_deadline_secs: default_renew_deadline_secs(),
retry_period_secs: default_retry_period_secs(),
jitter_factor: default_kubernetes_jitter_factor(),
}
}
}
fn default_lease_name_prefix() -> String {
"camel-".to_string()
}
fn default_lease_duration_secs() -> u64 {
15
}
fn default_renew_deadline_secs() -> u64 {
10
}
fn default_retry_period_secs() -> u64 {
2
}
fn default_kubernetes_jitter_factor() -> f64 {
0.2
}
#[derive(Debug, Clone, Deserialize, Default, PartialEq)]
pub struct ComponentsConfig {
#[serde(flatten)]
pub raw: HashMap<String, toml::Value>,
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct PrometheusCamelConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_prometheus_host")]
pub host: String,
#[serde(default = "default_prometheus_port")]
pub port: u16,
}
impl Default for PrometheusCamelConfig {
fn default() -> Self {
Self {
enabled: false,
host: default_prometheus_host(),
port: default_prometheus_port(),
}
}
}
fn default_prometheus_host() -> String {
"0.0.0.0".to_string()
}
fn default_prometheus_port() -> u16 {
9090
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct HealthCamelConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_health_host")]
pub host: String,
#[serde(default = "default_health_port")]
pub port: u16,
}
impl Default for HealthCamelConfig {
fn default() -> Self {
Self {
enabled: false,
host: default_health_host(),
port: default_health_port(),
}
}
}
fn default_health_host() -> String {
"0.0.0.0".to_string()
}
fn default_health_port() -> u16 {
8081
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ObservabilityConfig {
#[serde(default)]
pub tracer: TracerConfig,
#[serde(default)]
pub otel: Option<OtelCamelConfig>,
#[serde(default)]
pub prometheus: Option<PrometheusCamelConfig>,
#[serde(default)]
pub health: Option<HealthCamelConfig>,
}
#[derive(Debug, Clone, Deserialize, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OtelProtocol {
#[default]
Grpc,
Http,
}
#[derive(Debug, Clone, Deserialize, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OtelSampler {
#[default]
AlwaysOn,
AlwaysOff,
Ratio,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct OtelCamelConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_otel_endpoint")]
pub endpoint: String,
#[serde(default = "default_otel_service_name")]
pub service_name: String,
#[serde(default = "default_otel_log_level")]
pub log_level: String,
#[serde(default)]
pub protocol: OtelProtocol,
#[serde(default)]
pub sampler: OtelSampler,
#[serde(default)]
pub sampler_ratio: Option<f64>,
#[serde(default = "default_otel_metrics_interval_ms")]
pub metrics_interval_ms: u64,
#[serde(default = "default_true")]
pub logs_enabled: bool,
#[serde(default)]
pub resource_attrs: HashMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct SupervisionCamelConfig {
pub max_attempts: Option<u32>,
#[serde(default = "default_initial_delay_ms")]
pub initial_delay_ms: u64,
#[serde(default = "default_backoff_multiplier")]
pub backoff_multiplier: f64,
#[serde(default = "default_max_delay_ms")]
pub max_delay_ms: u64,
}
impl Default for SupervisionCamelConfig {
fn default() -> Self {
Self {
max_attempts: Some(5),
initial_delay_ms: 1000,
backoff_multiplier: 2.0,
max_delay_ms: 60000,
}
}
}
impl SupervisionCamelConfig {
pub fn into_supervision_config(self) -> camel_api::SupervisionConfig {
camel_api::SupervisionConfig {
max_attempts: self.max_attempts,
initial_delay: Duration::from_millis(self.initial_delay_ms),
backoff_multiplier: self.backoff_multiplier,
max_delay: Duration::from_millis(self.max_delay_ms),
}
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Default)]
#[serde(rename_all = "snake_case")]
pub enum JournalDurability {
#[default]
Immediate,
Eventual,
}
impl From<JournalDurability> for camel_core::JournalDurability {
fn from(d: JournalDurability) -> Self {
match d {
JournalDurability::Immediate => camel_core::JournalDurability::Immediate,
JournalDurability::Eventual => camel_core::JournalDurability::Eventual,
}
}
}
fn default_compaction_threshold_events() -> u64 {
10_000
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct JournalConfig {
pub path: std::path::PathBuf,
#[serde(default)]
pub durability: JournalDurability,
#[serde(default = "default_compaction_threshold_events")]
pub compaction_threshold_events: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct StreamCachingConfig {
#[serde(default = "default_stream_cache_threshold")]
pub threshold: usize,
}
fn default_stream_cache_threshold() -> usize {
camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD
}
impl Default for StreamCachingConfig {
fn default() -> Self {
Self {
threshold: default_stream_cache_threshold(),
}
}
}
impl From<&JournalConfig> for camel_core::RedbJournalOptions {
fn from(cfg: &JournalConfig) -> Self {
camel_core::RedbJournalOptions {
durability: cfg.durability.clone().into(),
compaction_threshold_events: cfg.compaction_threshold_events,
}
}
}
fn default_log_level() -> String {
"INFO".to_string()
}
fn default_timeout_ms() -> u64 {
5000
}
fn default_drain_timeout_ms() -> u64 {
10_000
}
fn default_watch_debounce_ms() -> u64 {
300
}
fn default_otel_endpoint() -> String {
"http://localhost:4317".to_string()
}
fn default_otel_service_name() -> String {
"rust-camel".to_string()
}
fn default_otel_log_level() -> String {
"info".to_string()
}
fn default_otel_metrics_interval_ms() -> u64 {
60000
}
fn default_true() -> bool {
true
}
fn default_initial_delay_ms() -> u64 {
1000
}
fn default_backoff_multiplier() -> f64 {
2.0
}
fn default_max_delay_ms() -> u64 {
60000
}
fn merge_toml_values(base: &mut toml::Value, overlay: &toml::Value) {
match (base, overlay) {
(toml::Value::Table(base_table), toml::Value::Table(overlay_table)) => {
for (key, value) in overlay_table {
if let Some(base_value) = base_table.get_mut(key) {
merge_toml_values(base_value, value);
} else {
base_table.insert(key.clone(), value.clone());
}
}
}
(base, overlay) => {
*base = overlay.clone();
}
}
}
impl CamelConfig {
pub fn from_file(path: &str) -> Result<Self, ConfigError> {
Self::from_file_with_profile(path, None)
}
pub fn from_file_with_env(path: &str) -> Result<Self, ConfigError> {
Self::from_file_with_profile_and_env(path, None)
}
pub fn from_file_with_profile(path: &str, profile: Option<&str>) -> Result<Self, ConfigError> {
let env_profile = env::var("CAMEL_PROFILE").ok();
let profile = profile.or(env_profile.as_deref());
let content = std::fs::read_to_string(path)
.map_err(|e| ConfigError::Message(format!("Failed to read config file: {}", e)))?;
let mut config_value: toml::Value = toml::from_str(&content)
.map_err(|e| ConfigError::Message(format!("Failed to parse TOML: {}", e)))?;
if let Some(p) = profile {
let default_value = config_value.get("default").cloned();
let profile_value = config_value.get(p).cloned();
if let (Some(mut base), Some(overlay)) = (default_value, profile_value) {
merge_toml_values(&mut base, &overlay);
config_value = base;
} else if let Some(profile_val) = config_value.get(p).cloned() {
config_value = profile_val;
} else {
return Err(ConfigError::Message(format!("Unknown profile: {}", p)));
}
} else {
if let Some(default_val) = config_value.get("default").cloned() {
config_value = default_val;
}
}
let merged_toml = toml::to_string(&config_value).map_err(|e| {
ConfigError::Message(format!("Failed to serialize merged config: {}", e))
})?;
let config = Config::builder()
.add_source(config::File::from_str(
&merged_toml,
config::FileFormat::Toml,
))
.build()?;
config.try_deserialize()
}
pub fn from_file_with_profile_and_env(
path: &str,
profile: Option<&str>,
) -> Result<Self, ConfigError> {
let env_profile = env::var("CAMEL_PROFILE").ok();
let profile = profile.or(env_profile.as_deref());
let content = std::fs::read_to_string(path)
.map_err(|e| ConfigError::Message(format!("Failed to read config file: {}", e)))?;
let mut config_value: toml::Value = toml::from_str(&content)
.map_err(|e| ConfigError::Message(format!("Failed to parse TOML: {}", e)))?;
if let Some(p) = profile {
let default_value = config_value.get("default").cloned();
let profile_value = config_value.get(p).cloned();
if let (Some(mut base), Some(overlay)) = (default_value, profile_value) {
merge_toml_values(&mut base, &overlay);
config_value = base;
} else if let Some(profile_val) = config_value.get(p).cloned() {
config_value = profile_val;
} else {
return Err(ConfigError::Message(format!("Unknown profile: {}", p)));
}
} else {
if let Some(default_val) = config_value.get("default").cloned() {
config_value = default_val;
}
}
let merged_toml = toml::to_string(&config_value).map_err(|e| {
ConfigError::Message(format!("Failed to serialize merged config: {}", e))
})?;
let config = Config::builder()
.add_source(config::File::from_str(
&merged_toml,
config::FileFormat::Toml,
))
.add_source(config::Environment::with_prefix("CAMEL").try_parsing(true))
.build()?;
config.try_deserialize()
}
pub fn from_env_or_default() -> Result<Self, ConfigError> {
let path = env::var("CAMEL_CONFIG_FILE").unwrap_or_else(|_| "Camel.toml".to_string());
Self::from_file(&path)
}
}
#[cfg(test)]
mod camel_config_defaults_tests {
use super::*;
#[test]
fn watch_debounce_ms_default_is_300() {
let config: CamelConfig = toml::from_str("").unwrap();
assert_eq!(config.watch_debounce_ms, 300);
}
#[test]
fn watch_debounce_ms_custom_value() {
let config: CamelConfig = toml::from_str("watch_debounce_ms = 50").unwrap();
assert_eq!(config.watch_debounce_ms, 50);
}
#[test]
fn stream_caching_default_threshold_is_set() {
let config: CamelConfig = toml::from_str("").unwrap();
assert_eq!(
config.stream_caching.threshold,
camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD
);
}
#[test]
fn stream_caching_custom_threshold_value() {
let config: CamelConfig = toml::from_str("[stream_caching]\nthreshold = 1234").unwrap();
assert_eq!(config.stream_caching.threshold, 1234);
}
}
#[cfg(test)]
mod components_config_tests {
use super::*;
#[test]
fn components_config_deserializes_raw_toml_block() {
let toml_str = r#"
[kafka]
brokers = ["localhost:9092"]
[redis]
host = "redis.local"
"#;
let cfg: ComponentsConfig = toml::from_str(toml_str).unwrap();
assert!(cfg.raw.contains_key("kafka"));
assert!(cfg.raw.contains_key("redis"));
}
}
#[cfg(test)]
mod prometheus_config_tests {
use super::*;
fn parse(toml: &str) -> CamelConfig {
let cfg = config::Config::builder()
.add_source(config::File::from_str(toml, config::FileFormat::Toml))
.build()
.unwrap();
cfg.try_deserialize().unwrap()
}
#[test]
fn test_prometheus_absent_is_none() {
let cfg = parse("");
assert!(cfg.observability.prometheus.is_none());
}
#[test]
fn test_prometheus_defaults() {
let cfg = parse(
r#"
[observability.prometheus]
enabled = true
"#,
);
let p = cfg.observability.prometheus.unwrap();
assert!(p.enabled);
assert_eq!(p.host, "0.0.0.0");
assert_eq!(p.port, 9090);
}
#[test]
fn test_prometheus_full() {
let cfg = parse(
r#"
[observability.prometheus]
enabled = true
host = "127.0.0.1"
port = 9091
"#,
);
let p = cfg.observability.prometheus.unwrap();
assert_eq!(p.host, "127.0.0.1");
assert_eq!(p.port, 9091);
}
#[test]
fn test_health_config_defaults() {
let cfg = parse(
r#"
[observability.health]
enabled = true
"#,
);
let h = cfg.observability.health.unwrap();
assert!(h.enabled);
assert_eq!(h.host, "0.0.0.0");
assert_eq!(h.port, 8081);
}
#[test]
fn test_health_config_custom_port() {
let cfg = parse(
r#"
[observability.health]
enabled = true
port = 9091
"#,
);
let h = cfg.observability.health.unwrap();
assert_eq!(h.port, 9091);
assert_eq!(h.host, "0.0.0.0");
}
}
#[cfg(test)]
mod platform_config_tests {
use super::*;
fn parse(toml: &str) -> CamelConfig {
let cfg = config::Config::builder()
.add_source(config::File::from_str(toml, config::FileFormat::Toml))
.build()
.unwrap();
cfg.try_deserialize().unwrap()
}
#[test]
fn platform_default_is_noop() {
let cfg = parse("");
assert!(matches!(cfg.platform, PlatformCamelConfig::Noop));
}
#[test]
fn platform_parses_kubernetes_from_toml() {
let cfg = parse(
r#"
[platform]
type = "kubernetes"
namespace = "team-a"
lease_name_prefix = "camel-"
lease_duration_secs = 15
renew_deadline_secs = 10
retry_period_secs = 2
jitter_factor = 0.2
"#,
);
match cfg.platform {
PlatformCamelConfig::Kubernetes(k8s) => {
assert_eq!(k8s.namespace.as_deref(), Some("team-a"));
assert_eq!(k8s.lease_name_prefix, "camel-");
assert_eq!(k8s.lease_duration_secs, 15);
assert_eq!(k8s.renew_deadline_secs, 10);
assert_eq!(k8s.retry_period_secs, 2);
assert!((k8s.jitter_factor - 0.2).abs() < f64::EPSILON);
}
other => panic!("expected Kubernetes, got {:?}", other),
}
}
#[test]
fn platform_kubernetes_defaults() {
let cfg = parse(
r#"
[platform]
type = "kubernetes"
"#,
);
match cfg.platform {
PlatformCamelConfig::Kubernetes(k8s) => {
assert!(k8s.namespace.is_none());
assert_eq!(k8s.lease_name_prefix, "camel-");
assert_eq!(k8s.lease_duration_secs, 15);
assert_eq!(k8s.renew_deadline_secs, 10);
assert_eq!(k8s.retry_period_secs, 2);
assert!((k8s.jitter_factor - 0.2).abs() < f64::EPSILON);
}
other => panic!("expected Kubernetes, got {:?}", other),
}
}
#[test]
fn platform_parses_kubernetes_from_file_with_profile() {
use std::io::Write;
let mut f = tempfile::NamedTempFile::new().expect("temp file");
f.write_all(
br#"
[default]
[default.platform]
type = "kubernetes"
namespace = "production"
[dev]
[dev.platform]
type = "noop"
"#,
)
.expect("write config");
let cfg_prod =
CamelConfig::from_file_with_profile(f.path().to_str().unwrap(), Some("default"))
.expect("prod config");
assert!(matches!(
cfg_prod.platform,
PlatformCamelConfig::Kubernetes(_)
));
let cfg_dev = CamelConfig::from_file_with_profile(f.path().to_str().unwrap(), Some("dev"))
.expect("dev config");
assert!(matches!(cfg_dev.platform, PlatformCamelConfig::Noop));
}
}
#[cfg(test)]
mod profile_loading_tests {
use super::*;
fn write_temp_config(contents: &str) -> tempfile::NamedTempFile {
use std::io::Write;
let mut f = tempfile::NamedTempFile::new().expect("temp file");
f.write_all(contents.as_bytes()).expect("write config");
f
}
#[test]
fn test_merge_toml_values_merges_nested_tables() {
let mut base: toml::Value = toml::from_str(
r#"
[components.http]
connect_timeout_ms = 1000
pool_max_idle_per_host = 50
"#,
)
.unwrap();
let overlay: toml::Value = toml::from_str(
r#"
[components.http]
response_timeout_ms = 2000
pool_max_idle_per_host = 99
"#,
)
.unwrap();
merge_toml_values(&mut base, &overlay);
let http = base
.get("components")
.and_then(|v| v.get("http"))
.expect("merged http table");
assert_eq!(
http.get("connect_timeout_ms").and_then(|v| v.as_integer()),
Some(1000)
);
assert_eq!(
http.get("response_timeout_ms").and_then(|v| v.as_integer()),
Some(2000)
);
assert_eq!(
http.get("pool_max_idle_per_host")
.and_then(|v| v.as_integer()),
Some(99)
);
}
#[test]
fn test_from_file_with_profile_merges_default_and_profile() {
let file = write_temp_config(
r#"
[default]
watch = false
[default.components.http]
connect_timeout_ms = 1000
pool_max_idle_per_host = 50
[prod]
watch = true
[prod.components.http]
pool_max_idle_per_host = 200
"#,
);
let cfg = CamelConfig::from_file_with_profile(file.path().to_str().unwrap(), Some("prod"))
.expect("config should load");
assert!(cfg.watch);
let http = cfg.components.raw.get("http").expect("http config");
assert_eq!(
http.get("connect_timeout_ms").and_then(|v| v.as_integer()),
Some(1000)
);
assert_eq!(
http.get("pool_max_idle_per_host")
.and_then(|v| v.as_integer()),
Some(200)
);
}
#[test]
fn test_from_file_with_profile_uses_profile_when_no_default() {
let file = write_temp_config(
r#"
[dev]
watch = true
timeout_ms = 777
"#,
);
let cfg = CamelConfig::from_file_with_profile(file.path().to_str().unwrap(), Some("dev"))
.expect("config should load");
assert!(cfg.watch);
assert_eq!(cfg.timeout_ms, 777);
}
#[test]
fn test_from_file_with_profile_unknown_profile_returns_error() {
let file = write_temp_config(
r#"
[default]
watch = false
"#,
);
let err = CamelConfig::from_file_with_profile(file.path().to_str().unwrap(), Some("qa"))
.expect_err("should fail");
assert!(err.to_string().contains("Unknown profile: qa"));
}
#[test]
fn test_from_file_without_profile_uses_default_section() {
let file = write_temp_config(
r#"
[default]
watch = true
timeout_ms = 321
"#,
);
let cfg =
CamelConfig::from_file(file.path().to_str().unwrap()).expect("config should load");
assert!(cfg.watch);
assert_eq!(cfg.timeout_ms, 321);
}
#[test]
fn test_from_file_with_env_overrides_timeout() {
let file = write_temp_config(
r#"
[default]
timeout_ms = 1000
"#,
);
unsafe {
std::env::set_var("CAMEL_TIMEOUT_MS", "9999");
}
let cfg = CamelConfig::from_file_with_env(file.path().to_str().unwrap())
.expect("config should load with env override");
assert_eq!(cfg.timeout_ms, 9999);
unsafe {
std::env::remove_var("CAMEL_TIMEOUT_MS");
}
}
}