#![allow(clippy::disallowed_methods)]
use std::collections::BTreeMap;
use obs_types::Severity;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct EventsConfig {
#[serde(default)]
pub filter: Option<String>,
#[serde(default)]
pub sampling: SamplingConfig,
#[serde(default)]
pub limits: LimitsConfig,
#[serde(default)]
pub audit: AuditConfig,
#[serde(default)]
pub queues: QueuesConfig,
#[serde(default)]
pub sinks: SinksConfig,
#[serde(default)]
pub service: ServiceConfig,
#[serde(default)]
pub dev_mode: bool,
}
impl EventsConfig {
#[must_use]
pub fn builder() -> EventsConfigBuilder {
EventsConfigBuilder::default()
}
pub fn from_yaml_str(yaml: &str) -> Result<Self, ConfigError> {
let expanded = expand_env_vars(yaml);
serde_yaml::from_str(&expanded)
.map_err(|e| ConfigError::Yaml(annotate_yaml_err(e.to_string())))
}
pub fn from_yaml_path(path: impl AsRef<std::path::Path>) -> Result<Self, ConfigError> {
let path = path.as_ref();
let bytes = std::fs::read_to_string(path)
.map_err(|e| ConfigError::Io(format!("{}: {}", path.display(), e)))?;
Self::from_yaml_str(&bytes)
}
#[must_use]
pub fn merged_with_env(self, prefix: &str) -> Self {
let mut overlay = serde_yaml::to_value(&self).unwrap_or(serde_yaml::Value::Null);
let prefix_uc = prefix.to_ascii_uppercase();
let prefix_with_under = format!("{prefix_uc}_");
for (key, value) in std::env::vars() {
if !key.starts_with(&prefix_with_under) {
continue;
}
let stripped = match key.strip_prefix(&prefix_with_under) {
Some(s) => s,
None => continue,
};
let path: Vec<String> = stripped
.split("__")
.map(|seg| seg.to_ascii_lowercase())
.collect();
apply_yaml_path(&mut overlay, &path, &value);
}
serde_yaml::from_value::<EventsConfig>(overlay).unwrap_or(self)
}
}
const VALID_ROOT_KEYS: &[&str] = &[
"filter", "sampling", "limits", "audit", "queues", "sinks", "service", "dev_mode",
];
fn annotate_yaml_err(msg: String) -> String {
if msg.contains("unknown field") {
format!(
"{msg}\nhint: valid obs.yaml root keys are: {}",
VALID_ROOT_KEYS.join(", ")
)
} else {
msg
}
}
fn apply_yaml_path(root: &mut serde_yaml::Value, path: &[String], value: &str) {
let Some((head, tail)) = path.split_first() else {
return;
};
if !root.is_mapping() {
*root = serde_yaml::Value::Mapping(Default::default());
}
let Some(map) = root.as_mapping_mut() else {
return;
};
let key = serde_yaml::Value::String(head.clone());
if tail.is_empty() {
let parsed: serde_yaml::Value = serde_yaml::from_str(value)
.unwrap_or_else(|_| serde_yaml::Value::String(value.to_string()));
map.insert(key, parsed);
} else {
let entry = map
.entry(key)
.or_insert_with(|| serde_yaml::Value::Mapping(Default::default()));
apply_yaml_path(entry, tail, value);
}
}
fn expand_env_vars(input: &str) -> String {
let mut out = String::with_capacity(input.len());
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
let Some(&b) = bytes.get(i) else { break };
if b == b'$'
&& bytes.get(i + 1) == Some(&b'{')
&& let Some(end) = bytes
.iter()
.skip(i + 2)
.position(|&c| c == b'}')
.map(|n| n + i + 2)
{
let Some(inner) = input.get(i + 2..end) else {
out.push(b as char);
i += 1;
continue;
};
let (name, default) = match inner.split_once(":-") {
Some((n, d)) => (n, Some(d)),
None => (inner, None),
};
let resolved = std::env::var(name)
.ok()
.or_else(|| default.map(str::to_string));
if let Some(v) = resolved {
out.push_str(&v);
} else {
let Some(span) = input.get(i..=end) else {
break;
};
out.push_str(span);
}
i = end + 1;
continue;
}
out.push(b as char);
i += 1;
}
out
}
impl EventsConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
if !(0.0..=1.0).contains(&self.sampling.default_rate) {
return Err(ConfigError::invalid_range(
"sampling.default_rate",
"must be in [0.0, 1.0]",
));
}
for (name, rate) in &self.sampling.per_event {
if !(0.0..=1.0).contains(rate) {
return Err(ConfigError::invalid_range(
"sampling.per_event[..]",
format!("{name} = {rate} is outside [0.0, 1.0]"),
));
}
}
if self.limits.max_payload_bytes < 1024 {
return Err(ConfigError::invalid_range(
"limits.max_payload_bytes",
"must be ≥ 1 KiB",
));
}
if self.limits.max_payload_bytes > 16 * 1024 * 1024 {
return Err(ConfigError::invalid_range(
"limits.max_payload_bytes",
"must be ≤ 16 MiB",
));
}
if self.queues.log < 64 || self.queues.metric < 64 || self.queues.trace < 64 {
return Err(ConfigError::invalid_range(
"queues.{log,metric,trace}",
"must be ≥ 64",
));
}
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct SamplingConfig {
#[serde(default = "default_one_f64")]
pub default_rate: f64,
#[serde(default)]
pub per_event: BTreeMap<String, f64>,
#[serde(default = "default_warn")]
pub always_log_at_or_above: Severity,
#[serde(default = "default_64_u16")]
pub tail_buffer_capacity: u16,
#[serde(default = "default_true")]
pub honour_traceparent_sampled: bool,
}
impl Default for SamplingConfig {
fn default() -> Self {
Self {
default_rate: default_one_f64(),
per_event: BTreeMap::new(),
always_log_at_or_above: default_warn(),
tail_buffer_capacity: default_64_u16(),
honour_traceparent_sampled: default_true(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct LimitsConfig {
#[serde(default = "default_256kib_u32")]
pub max_payload_bytes: u32,
#[serde(default = "default_1kib_u16")]
pub max_label_value_bytes: u16,
#[serde(default = "default_256_u16")]
pub max_external_string_bytes: u16,
}
impl Default for LimitsConfig {
fn default() -> Self {
Self {
max_payload_bytes: default_256kib_u32(),
max_label_value_bytes: default_1kib_u16(),
max_external_string_bytes: default_256_u16(),
}
}
}
const fn default_256_u16() -> u16 {
256
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct AuditConfig {
#[serde(default = "default_1024_u32")]
pub channel_capacity: u32,
#[serde(default = "default_100_u32")]
pub block_ms_max: u32,
#[serde(default = "default_250_u32")]
pub spool_after_ms: u32,
#[serde(default = "default_audit_dir")]
pub spool_dir: std::path::PathBuf,
#[serde(default = "default_1gib")]
pub spool_max_bytes: u64,
#[serde(default)]
pub on_failure: AuditFailureMode,
#[serde(default)]
pub fsync_mode: AuditFsyncMode,
}
impl Default for AuditConfig {
fn default() -> Self {
Self {
channel_capacity: default_1024_u32(),
block_ms_max: default_100_u32(),
spool_after_ms: default_250_u32(),
spool_dir: default_audit_dir(),
spool_max_bytes: default_1gib(),
on_failure: AuditFailureMode::default(),
fsync_mode: AuditFsyncMode::default(),
}
}
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AuditFsyncMode {
None,
#[default]
PerBatch,
PerRecord,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AuditFailureMode {
#[default]
Panic,
Abort,
WarnOnly,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct SinksConfig {
#[serde(default)]
pub stdout: serde_json::Value,
#[serde(default)]
pub otlp: serde_json::Value,
#[serde(default)]
pub ndjson: serde_json::Value,
#[serde(default)]
pub parquet: serde_json::Value,
#[serde(default)]
pub clickhouse: serde_json::Value,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct QueuesConfig {
#[serde(default = "default_8192_u32")]
pub log: u32,
#[serde(default = "default_8192_u32")]
pub metric: u32,
#[serde(default = "default_8192_u32")]
pub trace: u32,
}
impl Default for QueuesConfig {
fn default() -> Self {
Self {
log: default_8192_u32(),
metric: default_8192_u32(),
trace: default_8192_u32(),
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[non_exhaustive]
pub struct ServiceConfig {
pub name: Option<String>,
pub version: Option<String>,
pub instance: Option<String>,
pub namespace: Option<String>,
pub environment: Option<String>,
#[serde(default)]
pub extra: BTreeMap<String, String>,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ConfigError {
#[error("invalid range for `{field}`: {detail}")]
InvalidRange {
field: &'static str,
detail: String,
},
#[error("io: {0}")]
Io(String),
#[error("yaml: {0}")]
Yaml(String),
}
impl ConfigError {
pub(crate) fn invalid_range(field: &'static str, detail: impl Into<String>) -> Self {
Self::InvalidRange {
field,
detail: detail.into(),
}
}
}
#[derive(Debug, Default)]
pub struct EventsConfigBuilder {
cfg: EventsConfig,
}
impl EventsConfigBuilder {
#[must_use]
pub fn filter(mut self, s: impl Into<String>) -> Self {
self.cfg.filter = Some(s.into());
self
}
#[must_use]
pub fn sampling(mut self, s: SamplingConfig) -> Self {
self.cfg.sampling = s;
self
}
#[must_use]
pub fn limits(mut self, l: LimitsConfig) -> Self {
self.cfg.limits = l;
self
}
#[must_use]
pub fn queues(mut self, q: QueuesConfig) -> Self {
self.cfg.queues = q;
self
}
#[must_use]
pub fn audit(mut self, a: AuditConfig) -> Self {
self.cfg.audit = a;
self
}
#[must_use]
pub fn sinks(mut self, s: SinksConfig) -> Self {
self.cfg.sinks = s;
self
}
#[must_use]
pub fn service(mut self, s: ServiceConfig) -> Self {
self.cfg.service = s;
self
}
#[must_use]
pub fn build(self) -> EventsConfig {
self.cfg
}
}
const fn default_one_f64() -> f64 {
1.0
}
const fn default_true() -> bool {
true
}
const fn default_warn() -> Severity {
Severity::Warn
}
const fn default_64_u16() -> u16 {
64
}
const fn default_256kib_u32() -> u32 {
256 * 1024
}
const fn default_1kib_u16() -> u16 {
1024
}
const fn default_8192_u32() -> u32 {
8192
}
const fn default_100_u32() -> u32 {
100
}
const fn default_250_u32() -> u32 {
250
}
const fn default_1024_u32() -> u32 {
1024
}
const fn default_1gib() -> u64 {
1 << 30
}
fn default_audit_dir() -> std::path::PathBuf {
std::path::PathBuf::from("./obs-audit-spool")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_validate_default() {
EventsConfig::default().validate().unwrap();
}
#[test]
fn test_should_reject_bad_rate() {
let mut cfg = EventsConfig::default();
cfg.sampling.default_rate = 1.5;
assert!(cfg.validate().is_err());
}
#[test]
fn test_should_reject_tiny_payload_cap() {
let mut cfg = EventsConfig::default();
cfg.limits.max_payload_bytes = 100;
assert!(cfg.validate().is_err());
}
#[test]
fn test_should_round_trip_yaml() {
let cfg = EventsConfig::builder()
.filter("info")
.sampling(SamplingConfig {
default_rate: 0.5,
..Default::default()
})
.build();
let s = serde_yaml::to_string(&cfg).unwrap();
let cfg2: EventsConfig = serde_yaml::from_str(&s).unwrap();
assert_eq!(cfg.filter, cfg2.filter);
assert!((cfg.sampling.default_rate - cfg2.sampling.default_rate).abs() < f64::EPSILON);
}
#[test]
fn test_should_reject_unknown_fields() {
let yaml = "filter: info\nbogus_field: 42\n";
let result: Result<EventsConfig, _> = serde_yaml::from_str(yaml);
assert!(result.is_err(), "unknown_fields must reject unknown keys");
}
#[test]
fn test_from_yaml_str_should_hint_valid_root_keys_on_typo() {
let yaml = "filtr: info\n";
let err = EventsConfig::from_yaml_str(yaml).expect_err("unknown field");
let s = err.to_string();
assert!(
s.contains("unknown field"),
"raw serde error preserved: {s}"
);
assert!(
s.contains("valid obs.yaml root keys"),
"hint must list valid keys: {s}",
);
assert!(s.contains("filter"), "hint must enumerate `filter`: {s}");
assert!(
s.contains("dev_mode"),
"hint must enumerate `dev_mode`: {s}"
);
}
#[test]
fn test_valid_root_keys_cover_struct_fields() {
let cfg = EventsConfig::default();
let value = serde_yaml::to_value(&cfg).expect("serialize default");
let map = value.as_mapping().expect("config serializes as mapping");
for key in map.keys() {
let k = key.as_str().expect("key is string");
assert!(
VALID_ROOT_KEYS.contains(&k),
"EventsConfig field `{k}` missing from VALID_ROOT_KEYS; update the list so the \
hint keeps covering every valid root key",
);
}
}
#[test]
fn test_from_yaml_str_should_parse_filter_and_sampling() {
let yaml = "filter: info\nsampling:\n default_rate: 0.25\n";
let cfg = EventsConfig::from_yaml_str(yaml).expect("parse");
assert_eq!(cfg.filter.as_deref(), Some("info"));
assert!((cfg.sampling.default_rate - 0.25).abs() < f64::EPSILON);
}
#[test]
fn test_from_yaml_str_should_use_default_when_var_unset() {
let yaml = "filter: ${OBS_NEVER_SET_VAR_XYZ:-info}\n";
let cfg = EventsConfig::from_yaml_str(yaml).expect("parse");
assert_eq!(cfg.filter.as_deref(), Some("info"));
}
#[test]
fn test_expand_env_vars_should_keep_unmatched_reference_verbatim() {
let out = expand_env_vars("${OBS_NEVER_SET_VAR_AAAA}");
assert_eq!(out, "${OBS_NEVER_SET_VAR_AAAA}");
}
#[test]
fn test_expand_env_vars_should_drop_to_default_for_unset() {
let out = expand_env_vars("${OBS_NEVER_SET_VAR_BBBB:-fallback}");
assert_eq!(out, "fallback");
}
#[test]
fn test_apply_yaml_path_should_walk_nested_keys() {
let mut root = serde_yaml::Value::Mapping(Default::default());
apply_yaml_path(
&mut root,
&["sampling".to_string(), "default_rate".to_string()],
"0.5",
);
let cfg: EventsConfig = serde_yaml::from_value(root).expect("parse");
assert!((cfg.sampling.default_rate - 0.5).abs() < f64::EPSILON);
}
}