1#![allow(clippy::disallowed_methods)]
11
12use std::collections::BTreeMap;
13
14use obs_proto::obs::v1::Severity;
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20#[serde(deny_unknown_fields, rename_all = "snake_case")]
21#[non_exhaustive]
22pub struct EventsConfig {
23 #[serde(default)]
26 pub filter: Option<String>,
27
28 #[serde(default)]
30 pub sampling: SamplingConfig,
31
32 #[serde(default)]
34 pub limits: LimitsConfig,
35
36 #[serde(default)]
40 pub audit: AuditConfig,
41
42 #[serde(default)]
44 pub queues: QueuesConfig,
45
46 #[serde(default)]
50 pub sinks: SinksConfig,
51
52 #[serde(default)]
54 pub service: ServiceConfig,
55
56 #[serde(default)]
62 pub dev_mode: bool,
63}
64
65impl EventsConfig {
66 #[must_use]
68 pub fn builder() -> EventsConfigBuilder {
69 EventsConfigBuilder::default()
70 }
71
72 pub fn from_yaml_str(yaml: &str) -> Result<Self, ConfigError> {
88 let expanded = expand_env_vars(yaml);
89 serde_yaml::from_str(&expanded)
90 .map_err(|e| ConfigError::Yaml(annotate_yaml_err(e.to_string())))
91 }
92
93 pub fn from_yaml_path(path: impl AsRef<std::path::Path>) -> Result<Self, ConfigError> {
100 let path = path.as_ref();
101 let bytes = std::fs::read_to_string(path)
102 .map_err(|e| ConfigError::Io(format!("{}: {}", path.display(), e)))?;
103 Self::from_yaml_str(&bytes)
104 }
105
106 #[must_use]
114 pub fn merged_with_env(self, prefix: &str) -> Self {
115 let mut overlay = serde_yaml::to_value(&self).unwrap_or(serde_yaml::Value::Null);
116 let prefix_uc = prefix.to_ascii_uppercase();
117 let prefix_with_under = format!("{prefix_uc}_");
118 for (key, value) in std::env::vars() {
119 if !key.starts_with(&prefix_with_under) {
120 continue;
121 }
122 let stripped = match key.strip_prefix(&prefix_with_under) {
123 Some(s) => s,
124 None => continue,
125 };
126 let path: Vec<String> = stripped
128 .split("__")
129 .map(|seg| seg.to_ascii_lowercase())
130 .collect();
131 apply_yaml_path(&mut overlay, &path, &value);
132 }
133 serde_yaml::from_value::<EventsConfig>(overlay).unwrap_or(self)
134 }
135}
136
137const VALID_ROOT_KEYS: &[&str] = &[
143 "filter", "sampling", "limits", "audit", "queues", "sinks", "service", "dev_mode",
144];
145
146fn annotate_yaml_err(msg: String) -> String {
147 if msg.contains("unknown field") {
150 format!(
151 "{msg}\nhint: valid obs.yaml root keys are: {}",
152 VALID_ROOT_KEYS.join(", ")
153 )
154 } else {
155 msg
156 }
157}
158
159fn apply_yaml_path(root: &mut serde_yaml::Value, path: &[String], value: &str) {
160 let Some((head, tail)) = path.split_first() else {
161 return;
162 };
163 if !root.is_mapping() {
164 *root = serde_yaml::Value::Mapping(Default::default());
165 }
166 let Some(map) = root.as_mapping_mut() else {
167 return;
168 };
169 let key = serde_yaml::Value::String(head.clone());
170 if tail.is_empty() {
171 let parsed: serde_yaml::Value = serde_yaml::from_str(value)
174 .unwrap_or_else(|_| serde_yaml::Value::String(value.to_string()));
175 map.insert(key, parsed);
176 } else {
177 let entry = map
178 .entry(key)
179 .or_insert_with(|| serde_yaml::Value::Mapping(Default::default()));
180 apply_yaml_path(entry, tail, value);
181 }
182}
183
184fn expand_env_vars(input: &str) -> String {
188 let mut out = String::with_capacity(input.len());
189 let bytes = input.as_bytes();
190 let mut i = 0;
191 while i < bytes.len() {
192 let Some(&b) = bytes.get(i) else { break };
193 if b == b'$'
194 && bytes.get(i + 1) == Some(&b'{')
195 && let Some(end) = bytes
196 .iter()
197 .skip(i + 2)
198 .position(|&c| c == b'}')
199 .map(|n| n + i + 2)
200 {
201 let Some(inner) = input.get(i + 2..end) else {
202 out.push(b as char);
203 i += 1;
204 continue;
205 };
206 let (name, default) = match inner.split_once(":-") {
207 Some((n, d)) => (n, Some(d)),
208 None => (inner, None),
209 };
210 let resolved = std::env::var(name)
211 .ok()
212 .or_else(|| default.map(str::to_string));
213 if let Some(v) = resolved {
214 out.push_str(&v);
215 } else {
216 let Some(span) = input.get(i..=end) else {
217 break;
218 };
219 out.push_str(span);
220 }
221 i = end + 1;
222 continue;
223 }
224 out.push(b as char);
225 i += 1;
226 }
227 out
228}
229
230impl EventsConfig {
231 pub fn validate(&self) -> Result<(), ConfigError> {
238 if !(0.0..=1.0).contains(&self.sampling.default_rate) {
239 return Err(ConfigError::invalid_range(
240 "sampling.default_rate",
241 "must be in [0.0, 1.0]",
242 ));
243 }
244 for (name, rate) in &self.sampling.per_event {
245 if !(0.0..=1.0).contains(rate) {
246 return Err(ConfigError::invalid_range(
247 "sampling.per_event[..]",
248 format!("{name} = {rate} is outside [0.0, 1.0]"),
249 ));
250 }
251 }
252 if self.limits.max_payload_bytes < 1024 {
253 return Err(ConfigError::invalid_range(
254 "limits.max_payload_bytes",
255 "must be ≥ 1 KiB",
256 ));
257 }
258 if self.limits.max_payload_bytes > 16 * 1024 * 1024 {
259 return Err(ConfigError::invalid_range(
260 "limits.max_payload_bytes",
261 "must be ≤ 16 MiB",
262 ));
263 }
264 if self.queues.log < 64 || self.queues.metric < 64 || self.queues.trace < 64 {
265 return Err(ConfigError::invalid_range(
266 "queues.{log,metric,trace}",
267 "must be ≥ 64",
268 ));
269 }
270 Ok(())
271 }
272}
273
274#[derive(Clone, Debug, Serialize, Deserialize)]
276#[serde(deny_unknown_fields, rename_all = "snake_case")]
277#[non_exhaustive]
278pub struct SamplingConfig {
279 #[serde(default = "default_one_f64")]
281 pub default_rate: f64,
282 #[serde(default)]
284 pub per_event: BTreeMap<String, f64>,
285 #[serde(default = "default_warn")]
287 pub always_log_at_or_above: Severity,
288 #[serde(default = "default_64_u16")]
290 pub tail_buffer_capacity: u16,
291 #[serde(default = "default_true")]
293 pub honour_traceparent_sampled: bool,
294}
295
296impl Default for SamplingConfig {
297 fn default() -> Self {
298 Self {
299 default_rate: default_one_f64(),
300 per_event: BTreeMap::new(),
301 always_log_at_or_above: default_warn(),
302 tail_buffer_capacity: default_64_u16(),
303 honour_traceparent_sampled: default_true(),
304 }
305 }
306}
307
308#[derive(Clone, Debug, Serialize, Deserialize)]
310#[serde(deny_unknown_fields, rename_all = "snake_case")]
311#[non_exhaustive]
312pub struct LimitsConfig {
313 #[serde(default = "default_256kib_u32")]
315 pub max_payload_bytes: u32,
316 #[serde(default = "default_1kib_u16")]
318 pub max_label_value_bytes: u16,
319 #[serde(default = "default_256_u16")]
329 pub max_external_string_bytes: u16,
330}
331
332impl Default for LimitsConfig {
333 fn default() -> Self {
334 Self {
335 max_payload_bytes: default_256kib_u32(),
336 max_label_value_bytes: default_1kib_u16(),
337 max_external_string_bytes: default_256_u16(),
338 }
339 }
340}
341
342const fn default_256_u16() -> u16 {
343 256
344}
345
346#[derive(Clone, Debug, Serialize, Deserialize)]
351#[serde(deny_unknown_fields, rename_all = "snake_case")]
352#[non_exhaustive]
353pub struct AuditConfig {
354 #[serde(default = "default_1024_u32")]
356 pub channel_capacity: u32,
357 #[serde(default = "default_100_u32")]
359 pub block_ms_max: u32,
360 #[serde(default = "default_250_u32")]
362 pub spool_after_ms: u32,
363 #[serde(default = "default_audit_dir")]
365 pub spool_dir: std::path::PathBuf,
366 #[serde(default = "default_1gib")]
368 pub spool_max_bytes: u64,
369 #[serde(default)]
371 pub on_failure: AuditFailureMode,
372 #[serde(default)]
378 pub fsync_mode: AuditFsyncMode,
379}
380
381impl Default for AuditConfig {
382 fn default() -> Self {
383 Self {
384 channel_capacity: default_1024_u32(),
385 block_ms_max: default_100_u32(),
386 spool_after_ms: default_250_u32(),
387 spool_dir: default_audit_dir(),
388 spool_max_bytes: default_1gib(),
389 on_failure: AuditFailureMode::default(),
390 fsync_mode: AuditFsyncMode::default(),
391 }
392 }
393}
394
395#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
398#[serde(rename_all = "snake_case")]
399pub enum AuditFsyncMode {
400 None,
403 #[default]
407 PerBatch,
408 PerRecord,
412}
413
414#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
416#[serde(rename_all = "snake_case")]
417pub enum AuditFailureMode {
418 #[default]
420 Panic,
421 Abort,
423 WarnOnly,
425}
426
427#[derive(Clone, Debug, Default, Serialize, Deserialize)]
432#[serde(deny_unknown_fields, rename_all = "snake_case")]
433#[non_exhaustive]
434pub struct SinksConfig {
435 #[serde(default)]
440 pub stdout: serde_json::Value,
441 #[serde(default)]
444 pub otlp: serde_json::Value,
445 #[serde(default)]
447 pub ndjson: serde_json::Value,
448 #[serde(default)]
450 pub parquet: serde_json::Value,
451 #[serde(default)]
453 pub clickhouse: serde_json::Value,
454}
455
456#[derive(Clone, Debug, Serialize, Deserialize)]
458#[serde(deny_unknown_fields, rename_all = "snake_case")]
459#[non_exhaustive]
460pub struct QueuesConfig {
461 #[serde(default = "default_8192_u32")]
463 pub log: u32,
464 #[serde(default = "default_8192_u32")]
466 pub metric: u32,
467 #[serde(default = "default_8192_u32")]
469 pub trace: u32,
470}
471
472impl Default for QueuesConfig {
473 fn default() -> Self {
474 Self {
475 log: default_8192_u32(),
476 metric: default_8192_u32(),
477 trace: default_8192_u32(),
478 }
479 }
480}
481
482#[derive(Clone, Debug, Default, Serialize, Deserialize)]
484#[serde(deny_unknown_fields, rename_all = "snake_case")]
485#[non_exhaustive]
486pub struct ServiceConfig {
487 pub name: Option<String>,
490 pub version: Option<String>,
492 pub instance: Option<String>,
494 pub namespace: Option<String>,
496 pub environment: Option<String>,
498 #[serde(default)]
500 pub extra: BTreeMap<String, String>,
501}
502
503#[derive(Debug, thiserror::Error)]
505#[non_exhaustive]
506pub enum ConfigError {
507 #[error("invalid range for `{field}`: {detail}")]
509 InvalidRange {
510 field: &'static str,
512 detail: String,
515 },
516 #[error("io: {0}")]
518 Io(String),
519 #[error("yaml: {0}")]
521 Yaml(String),
522}
523
524impl ConfigError {
525 pub(crate) fn invalid_range(field: &'static str, detail: impl Into<String>) -> Self {
528 Self::InvalidRange {
529 field,
530 detail: detail.into(),
531 }
532 }
533}
534
535#[derive(Debug, Default)]
537pub struct EventsConfigBuilder {
538 cfg: EventsConfig,
539}
540
541impl EventsConfigBuilder {
542 #[must_use]
544 pub fn filter(mut self, s: impl Into<String>) -> Self {
545 self.cfg.filter = Some(s.into());
546 self
547 }
548
549 #[must_use]
551 pub fn sampling(mut self, s: SamplingConfig) -> Self {
552 self.cfg.sampling = s;
553 self
554 }
555
556 #[must_use]
558 pub fn limits(mut self, l: LimitsConfig) -> Self {
559 self.cfg.limits = l;
560 self
561 }
562
563 #[must_use]
565 pub fn queues(mut self, q: QueuesConfig) -> Self {
566 self.cfg.queues = q;
567 self
568 }
569
570 #[must_use]
572 pub fn audit(mut self, a: AuditConfig) -> Self {
573 self.cfg.audit = a;
574 self
575 }
576
577 #[must_use]
579 pub fn sinks(mut self, s: SinksConfig) -> Self {
580 self.cfg.sinks = s;
581 self
582 }
583
584 #[must_use]
586 pub fn service(mut self, s: ServiceConfig) -> Self {
587 self.cfg.service = s;
588 self
589 }
590
591 #[must_use]
594 pub fn build(self) -> EventsConfig {
595 self.cfg
596 }
597}
598
599const fn default_one_f64() -> f64 {
600 1.0
601}
602const fn default_true() -> bool {
603 true
604}
605const fn default_warn() -> Severity {
606 Severity::Warn
607}
608const fn default_64_u16() -> u16 {
609 64
610}
611const fn default_256kib_u32() -> u32 {
612 256 * 1024
613}
614const fn default_1kib_u16() -> u16 {
615 1024
616}
617const fn default_8192_u32() -> u32 {
618 8192
619}
620const fn default_100_u32() -> u32 {
621 100
622}
623const fn default_250_u32() -> u32 {
624 250
625}
626const fn default_1024_u32() -> u32 {
627 1024
628}
629const fn default_1gib() -> u64 {
630 1 << 30
631}
632fn default_audit_dir() -> std::path::PathBuf {
633 std::path::PathBuf::from("./obs-audit-spool")
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639
640 #[test]
641 fn test_should_validate_default() {
642 EventsConfig::default().validate().unwrap();
643 }
644
645 #[test]
646 fn test_should_reject_bad_rate() {
647 let mut cfg = EventsConfig::default();
648 cfg.sampling.default_rate = 1.5;
649 assert!(cfg.validate().is_err());
650 }
651
652 #[test]
653 fn test_should_reject_tiny_payload_cap() {
654 let mut cfg = EventsConfig::default();
655 cfg.limits.max_payload_bytes = 100;
656 assert!(cfg.validate().is_err());
657 }
658
659 #[test]
660 fn test_should_round_trip_yaml() {
661 let cfg = EventsConfig::builder()
662 .filter("info")
663 .sampling(SamplingConfig {
664 default_rate: 0.5,
665 ..Default::default()
666 })
667 .build();
668 let s = serde_yaml::to_string(&cfg).unwrap();
669 let cfg2: EventsConfig = serde_yaml::from_str(&s).unwrap();
670 assert_eq!(cfg.filter, cfg2.filter);
671 assert!((cfg.sampling.default_rate - cfg2.sampling.default_rate).abs() < f64::EPSILON);
672 }
673
674 #[test]
675 fn test_should_reject_unknown_fields() {
676 let yaml = "filter: info\nbogus_field: 42\n";
677 let result: Result<EventsConfig, _> = serde_yaml::from_str(yaml);
678 assert!(result.is_err(), "unknown_fields must reject unknown keys");
679 }
680
681 #[test]
682 fn test_from_yaml_str_should_hint_valid_root_keys_on_typo() {
683 let yaml = "filtr: info\n";
685 let err = EventsConfig::from_yaml_str(yaml).expect_err("unknown field");
686 let s = err.to_string();
687 assert!(
688 s.contains("unknown field"),
689 "raw serde error preserved: {s}"
690 );
691 assert!(
692 s.contains("valid obs.yaml root keys"),
693 "hint must list valid keys: {s}",
694 );
695 assert!(s.contains("filter"), "hint must enumerate `filter`: {s}");
697 assert!(
698 s.contains("dev_mode"),
699 "hint must enumerate `dev_mode`: {s}"
700 );
701 }
702
703 #[test]
704 fn test_valid_root_keys_cover_struct_fields() {
705 let cfg = EventsConfig::default();
708 let value = serde_yaml::to_value(&cfg).expect("serialize default");
709 let map = value.as_mapping().expect("config serializes as mapping");
710 for key in map.keys() {
711 let k = key.as_str().expect("key is string");
712 assert!(
713 VALID_ROOT_KEYS.contains(&k),
714 "EventsConfig field `{k}` missing from VALID_ROOT_KEYS; update the list so the \
715 hint keeps covering every valid root key",
716 );
717 }
718 }
719
720 #[test]
721 fn test_from_yaml_str_should_parse_filter_and_sampling() {
722 let yaml = "filter: info\nsampling:\n default_rate: 0.25\n";
723 let cfg = EventsConfig::from_yaml_str(yaml).expect("parse");
724 assert_eq!(cfg.filter.as_deref(), Some("info"));
725 assert!((cfg.sampling.default_rate - 0.25).abs() < f64::EPSILON);
726 }
727
728 #[test]
729 fn test_from_yaml_str_should_use_default_when_var_unset() {
730 let yaml = "filter: ${OBS_NEVER_SET_VAR_XYZ:-info}\n";
737 let cfg = EventsConfig::from_yaml_str(yaml).expect("parse");
738 assert_eq!(cfg.filter.as_deref(), Some("info"));
739 }
740
741 #[test]
742 fn test_expand_env_vars_should_keep_unmatched_reference_verbatim() {
743 let out = expand_env_vars("${OBS_NEVER_SET_VAR_AAAA}");
744 assert_eq!(out, "${OBS_NEVER_SET_VAR_AAAA}");
745 }
746
747 #[test]
748 fn test_expand_env_vars_should_drop_to_default_for_unset() {
749 let out = expand_env_vars("${OBS_NEVER_SET_VAR_BBBB:-fallback}");
750 assert_eq!(out, "fallback");
751 }
752
753 #[test]
754 fn test_apply_yaml_path_should_walk_nested_keys() {
755 let mut root = serde_yaml::Value::Mapping(Default::default());
756 apply_yaml_path(
757 &mut root,
758 &["sampling".to_string(), "default_rate".to_string()],
759 "0.5",
760 );
761 let cfg: EventsConfig = serde_yaml::from_value(root).expect("parse");
762 assert!((cfg.sampling.default_rate - 0.5).abs() < f64::EPSILON);
763 }
764}