use crate::observability::config::{
DynamicLogConfig, FuzzyMatchType, FuzzyRuleConfig, FuzzyRuleKind, LogConfig, LogConsoleConfig,
LogFilterConfig, LogFilterOverrideConfig, LogFormat, LogLocalConfig, LogRemoteConfig,
ObservabilityConfig, TraceConfig,
};
use crate::observability::filter::SharedOrderedFilter;
use crate::observability::reload::ObservabilityReloadHandle;
use anyhow::{bail, Context, Result};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, SystemTime};
const WATCH_POLL_INTERVAL: Duration = Duration::from_secs(1);
const WATCH_RELOAD_DELAY: Duration = Duration::from_secs(10);
pub fn validate_observability_config_path(path: impl AsRef<Path>) -> Result<ObservabilityConfig> {
let path = path.as_ref();
let text =
fs::read_to_string(path).with_context(|| format!("failed to read {}", path.display()))?;
let config: ObservabilityConfig =
toml::from_str(&text).with_context(|| format!("failed to parse {}", path.display()))?;
validate_observability_config(&config)?;
Ok(config)
}
pub fn validate_observability_config(config: &ObservabilityConfig) -> Result<()> {
SharedOrderedFilter::new(config.log.filter.clone())
.context("invalid local log filter configuration")?;
if let Some(filter) = &config.log.remote.filter {
SharedOrderedFilter::new(filter.clone())
.context("invalid remote log filter configuration")?;
}
if let Some(filter) = &config.trace.filter {
SharedOrderedFilter::new(filter.clone()).context("invalid trace filter configuration")?;
}
if config.log.remote.enabled.unwrap_or(false) && config.log.remote.endpoint.trim().is_empty() {
bail!("remote log endpoint cannot be empty when remote logs are enabled");
}
if config.trace.enabled {
validate_otlp_exporter("trace", &config.trace.exporter, &config.trace.endpoint)?;
}
if let Some(metrics) = &config.metrics {
if metrics.enabled {
validate_otlp_exporter("metrics", &metrics.exporter, &metrics.endpoint)?;
}
}
Ok(())
}
pub fn observability_config_from_rust_log(value: Option<&str>) -> Result<ObservabilityConfig> {
let filter = parse_rust_log_filter(value.unwrap_or("info"))?;
Ok(ObservabilityConfig {
log: LogConfig {
format: Some(LogFormat::Compact),
filter,
console: Some(LogConsoleConfig {
enabled: Some(true),
}),
local: LogLocalConfig {
enabled: Some(false),
file_dir: "logs".to_string(),
file_name: "app.log".to_string(),
},
remote: LogRemoteConfig {
enabled: Some(false),
endpoint: String::new(),
filter: None,
},
dynamic: DynamicLogConfig { enabled: true },
},
trace: TraceConfig {
enabled: false,
exporter: "otlp".to_string(),
endpoint: String::new(),
service_name: "pi_logger".to_string(),
service_version: None,
filter: None,
},
metrics: None,
})
}
pub(crate) struct ConfigWatchGuard {
stop: Arc<AtomicBool>,
worker: Option<JoinHandle<()>>,
}
impl ConfigWatchGuard {
pub(crate) fn start(path: PathBuf, reload: ObservabilityReloadHandle) -> Self {
let stop = Arc::new(AtomicBool::new(false));
let worker_stop = stop.clone();
let worker = thread::spawn(move || watch_config_file(path, reload, worker_stop));
Self {
stop,
worker: Some(worker),
}
}
}
impl Drop for ConfigWatchGuard {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
if let Some(worker) = self.worker.take() {
let _ = worker.join();
}
}
}
fn parse_rust_log_filter(value: &str) -> Result<LogFilterConfig> {
let mut default_level = "info".to_string();
let mut overrides = Vec::new();
for (index, directive) in value.split(',').map(str::trim).enumerate() {
if directive.is_empty() {
continue;
}
if let Some((target, level)) = directive.split_once('=') {
let target = target.trim();
let level = level.trim();
if target.is_empty() || level.is_empty() || target.contains('[') {
bail!("unsupported RUST_LOG directive `{}`", directive);
}
overrides.push(LogFilterOverrideConfig {
name: format!("rust_log_{index}_{target}"),
enabled: Some(true),
level: level.to_string(),
priority: 100,
fuzzy_rules: vec![FuzzyRuleConfig {
kind: FuzzyRuleKind::Target,
match_type: FuzzyMatchType::Prefix,
pattern: target.to_string(),
level: None,
}],
field_rules: Vec::new(),
});
} else {
default_level = directive.to_string();
}
}
let filter = LogFilterConfig {
default_level,
overrides,
};
SharedOrderedFilter::new(filter.clone()).context("invalid RUST_LOG configuration")?;
Ok(filter)
}
fn validate_otlp_exporter(kind: &str, exporter: &str, endpoint: &str) -> Result<()> {
if exporter != "otlp" {
bail!("unsupported {} exporter `{}`", kind, exporter);
}
if endpoint.trim().is_empty() {
bail!("{} endpoint cannot be empty when enabled", kind);
}
Ok(())
}
fn watch_config_file(path: PathBuf, reload: ObservabilityReloadHandle, stop: Arc<AtomicBool>) {
let mut last_modified = modified_time(&path);
let mut pending_since = None;
while !stop.load(Ordering::Acquire) {
thread::sleep(WATCH_POLL_INTERVAL);
let modified = modified_time(&path);
if modified != last_modified {
last_modified = modified;
pending_since = Some(SystemTime::now());
continue;
}
let Some(since) = pending_since else {
continue;
};
if since.elapsed().unwrap_or_default() < WATCH_RELOAD_DELAY {
continue;
}
pending_since = None;
match validate_observability_config_path(&path)
.and_then(|config| reload_observability_filters(&reload, &config))
{
Ok(()) => eprintln!("reloaded observability filters from {}", path.display()),
Err(error) => eprintln!(
"failed to reload observability filters from {}: {:#}",
path.display(),
error
),
}
}
}
pub fn reload_observability_filters(
reload: &ObservabilityReloadHandle,
config: &ObservabilityConfig,
) -> Result<()> {
validate_observability_config(config)?;
let local_filter = config.log.filter.clone();
let remote_filter = if reload.remote().is_ok() {
Some(if reload.remote_uses_local_filter() {
if config.log.remote.filter.is_some() {
bail!("remote filter changed from shared to independent; restart is required");
}
local_filter.clone()
} else {
config
.log
.remote
.filter
.clone()
.context("remote filter was initialized independently but is now missing")?
})
} else {
None
};
let trace_filter = config
.trace
.filter
.clone()
.unwrap_or_else(|| LogFilterConfig {
default_level: "trace".to_string(),
overrides: Vec::new(),
});
reload.local().reload_filter_unchecked(local_filter)?;
if let (Ok(remote), Some(remote_filter)) = (reload.remote(), remote_filter) {
remote.reload_filter_unchecked(remote_filter)?;
}
if let Ok(trace) = reload.trace() {
trace.reload_filter_unchecked(trace_filter)?;
}
Ok(())
}
fn modified_time(path: &Path) -> Option<SystemTime> {
fs::metadata(path)
.and_then(|metadata| metadata.modified())
.ok()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::observability::reload::FilterReloadHandle;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn rust_log_directives_convert_to_default_and_prefix_overrides() {
let config =
observability_config_from_rust_log(Some("info,my_app=debug,hyper=warn")).unwrap();
assert_eq!(config.log.filter.default_level, "info");
assert_eq!(config.log.filter.overrides.len(), 2);
assert_eq!(config.log.filter.overrides[0].level, "debug");
assert_eq!(
config.log.filter.overrides[0].fuzzy_rules[0].pattern,
"my_app"
);
assert_eq!(
config.log.filter.overrides[0].fuzzy_rules[0].match_type,
FuzzyMatchType::Prefix
);
}
#[test]
fn rust_log_invalid_level_fails_validation() {
let error = observability_config_from_rust_log(Some("info,my_app=verbose")).unwrap_err();
assert!(format!("{error:#}").contains("unsupported log level `verbose`"));
}
#[test]
fn config_path_validation_reads_and_checks_filters() {
let path = std::env::temp_dir().join(format!(
"pi_logger_observability_{}.toml",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
));
fs::write(
&path,
r#"
[log]
format = "compact"
[log.filter]
default_level = "info"
[log.console]
enabled = true
[log.local]
enabled = false
file_dir = "logs"
file_name = "app.log"
[log.remote]
enabled = false
endpoint = ""
[log.dynamic]
enabled = true
[trace]
enabled = false
exporter = "otlp"
endpoint = ""
service_name = "test"
"#,
)
.unwrap();
let config = validate_observability_config_path(&path).unwrap();
assert_eq!(config.log.filter.default_level, "info");
fs::remove_file(path).unwrap();
}
#[test]
fn file_reload_updates_all_initialized_filter_scopes() {
let local = disabled_reload_scope("local", "info");
let remote = disabled_reload_scope("remote", "off");
let trace = disabled_reload_scope("trace", "warn");
let reload = ObservabilityReloadHandle::new(local, Some(remote), Some(trace), false);
let mut config = observability_config_from_rust_log(Some("debug")).unwrap();
config.log.remote.filter = Some(filter_config("error"));
config.trace.filter = Some(filter_config("trace"));
reload_observability_filters(&reload, &config).unwrap();
assert_eq!(
reload.local().current_filter_config().default_level,
"debug"
);
assert_eq!(
reload
.remote()
.unwrap()
.current_filter_config()
.default_level,
"error"
);
assert_eq!(
reload
.trace()
.unwrap()
.current_filter_config()
.default_level,
"trace"
);
}
#[test]
fn missing_independent_remote_filter_does_not_partially_reload_local() {
let local = disabled_reload_scope("local", "info");
let remote = disabled_reload_scope("remote", "off");
let reload = ObservabilityReloadHandle::new(local, Some(remote), None, false);
let config = observability_config_from_rust_log(Some("debug")).unwrap();
assert!(reload_observability_filters(&reload, &config).is_err());
assert_eq!(reload.local().current_filter_config().default_level, "info");
}
fn disabled_reload_scope(scope: &'static str, level: &str) -> FilterReloadHandle {
FilterReloadHandle::new(
false,
scope,
SharedOrderedFilter::new(filter_config(level)).unwrap(),
)
}
fn filter_config(level: &str) -> LogFilterConfig {
LogFilterConfig {
default_level: level.to_string(),
overrides: Vec::new(),
}
}
}