use std::{path::PathBuf, sync::Arc, time::Duration};
use obs_core::{
EventsConfig, FormatterStyle, InMemoryObserver, Sink, StandardObserver, StdoutSink, Tier,
install_observer, install_panic_hook, observer,
};
const DEFAULT_SHUTDOWN_BUDGET: Duration = Duration::from_millis(250);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum ServicePreset {
#[default]
Dev,
Production,
InMemory,
}
#[must_use = "call .install() to apply the configuration"]
pub struct InitBuilder {
service: String,
version: String,
instance: Option<String>,
config_path: Option<PathBuf>,
config: Option<EventsConfig>,
preset: ServicePreset,
panic_hook: bool,
#[cfg(feature = "tracing-bridge")]
tracing_bridge_filter: Option<String>,
#[cfg(unix)]
sighup_reload: bool,
sink_overrides: Vec<(Tier, Arc<dyn Sink>)>,
sink_fallback: Option<Arc<dyn Sink>>,
shutdown_budget: Duration,
}
impl std::fmt::Debug for InitBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InitBuilder")
.field("service", &self.service)
.field("version", &self.version)
.field("instance", &self.instance)
.field("config_path", &self.config_path)
.field("preset", &self.preset)
.field("panic_hook", &self.panic_hook)
.field("sink_override_count", &self.sink_overrides.len())
.field("has_fallback", &self.sink_fallback.is_some())
.field("shutdown_budget", &self.shutdown_budget)
.finish_non_exhaustive()
}
}
pub fn init_for_service(service: impl Into<String>, version: impl Into<String>) -> InitBuilder {
InitBuilder {
service: service.into(),
version: version.into(),
instance: None,
config_path: None,
config: None,
preset: ServicePreset::default(),
panic_hook: true,
#[cfg(feature = "tracing-bridge")]
tracing_bridge_filter: None,
#[cfg(unix)]
sighup_reload: false,
sink_overrides: Vec::new(),
sink_fallback: None,
shutdown_budget: DEFAULT_SHUTDOWN_BUDGET,
}
}
impl InitBuilder {
pub fn instance(mut self, id: impl Into<String>) -> Self {
self.instance = Some(id.into());
self
}
pub fn config_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config_path = Some(path.into());
self
}
pub fn config(mut self, cfg: EventsConfig) -> Self {
self.config = Some(cfg);
self
}
pub fn preset(mut self, preset: ServicePreset) -> Self {
self.preset = preset;
self
}
pub fn shutdown_budget(mut self, budget: Duration) -> Self {
self.shutdown_budget = budget;
self
}
pub fn with_panic_hook(mut self, enabled: bool) -> Self {
self.panic_hook = enabled;
self
}
#[cfg(feature = "tracing-bridge")]
pub fn with_tracing_bridge(mut self, filter: impl Into<String>) -> Self {
self.tracing_bridge_filter = Some(filter.into());
self
}
#[cfg(unix)]
pub fn with_sighup_reload(mut self, enabled: bool) -> Self {
self.sighup_reload = enabled;
self
}
pub fn with_sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
self.sink_overrides.push((tier, sink));
self
}
pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
self.sink_fallback = Some(sink);
self
}
pub async fn install(self) -> Result<InitGuard, InitError> {
let config = match (self.config.clone(), self.config_path.as_ref()) {
(Some(cfg), _) => cfg,
(None, Some(path)) => load_config(path).await?,
(None, None) => EventsConfig::default(),
};
match self.preset {
ServicePreset::InMemory => {
install_observer(InMemoryObserver::new());
}
preset => {
let observer = build_observer(&self, preset, config.clone())?;
install_observer(observer);
}
}
if self.panic_hook {
install_panic_hook();
}
#[cfg(feature = "tracing-bridge")]
if let Some(ref filter) = self.tracing_bridge_filter {
install_tracing_bridge(filter)?;
}
#[cfg(unix)]
if self.sighup_reload
&& let Some(path) = self.config_path.clone()
{
spawn_sighup_reload(path);
}
Ok(InitGuard {
shutdown_budget: self.shutdown_budget,
})
}
}
fn build_observer(
b: &InitBuilder,
preset: ServicePreset,
config: EventsConfig,
) -> Result<StandardObserver, InitError> {
let instance = b
.instance
.clone()
.or_else(|| std::env::var("OTEL_SERVICE_INSTANCE_ID").ok())
.unwrap_or_default();
let mut builder = StandardObserver::builder()
.service(b.service.clone(), b.version.clone())
.instance(instance)
.config(config);
match preset {
ServicePreset::Dev => {
let compact: Arc<dyn Sink> = Arc::new(StdoutSink::new(FormatterStyle::Compact));
builder = builder
.sink_for(Tier::Log, Arc::clone(&compact))
.sink_for(Tier::Metric, Arc::clone(&compact))
.sink_for(Tier::Trace, Arc::clone(&compact));
}
ServicePreset::Production => {
}
ServicePreset::InMemory => unreachable!("handled in install()"),
}
let fallback = b
.sink_fallback
.clone()
.unwrap_or_else(|| Arc::new(StdoutSink::default()) as Arc<dyn Sink>);
builder = builder.sink_fallback(fallback);
for (tier, sink) in &b.sink_overrides {
builder = builder.sink_for(*tier, Arc::clone(sink));
}
builder.build().map_err(InitError::Build)
}
async fn load_config(path: &std::path::Path) -> Result<EventsConfig, InitError> {
let bytes = tokio::fs::read_to_string(path)
.await
.map_err(|source| InitError::ConfigRead {
path: path.to_path_buf(),
source,
})?;
EventsConfig::from_yaml_str(&bytes).map_err(|source| InitError::ConfigParse {
path: path.to_path_buf(),
source,
})
}
#[cfg(feature = "tracing-bridge")]
fn install_tracing_bridge(filter: &str) -> Result<(), InitError> {
use std::sync::OnceLock;
static INSTALLED: OnceLock<()> = OnceLock::new();
let mut result: Result<(), InitError> = Ok(());
INSTALLED.get_or_init(|| {
if let Err(e) = obs_tracing_bridge::init(Some(filter)) {
result = Err(InitError::TracingBridge(e));
}
});
result
}
#[cfg(unix)]
fn spawn_sighup_reload(path: PathBuf) {
use tokio::signal::unix::{SignalKind, signal};
tokio::spawn(async move {
let Ok(mut sig) = signal(SignalKind::hangup()) else {
return;
};
while sig.recv().await.is_some() {
if let Ok(bytes) = tokio::fs::read_to_string(&path).await
&& let Ok(next) = EventsConfig::from_yaml_str(&bytes)
&& let Some(std_obs) = observer_as_standard()
{
let _ = std_obs.reload_config(next);
}
}
});
}
#[cfg(unix)]
fn observer_as_standard() -> Option<Arc<StandardObserver>> {
let _ = observer();
None
}
#[must_use = "dropping the guard drains the observer — keep it alive for the lifetime of the \
process"]
#[derive(Debug)]
pub struct InitGuard {
shutdown_budget: Duration,
}
impl Drop for InitGuard {
fn drop(&mut self) {
observer().shutdown_blocking(self.shutdown_budget);
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum InitError {
#[error("read obs config `{}`: {source}", path.display())]
ConfigRead {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("parse obs config `{}`: {source}", path.display())]
ConfigParse {
path: PathBuf,
#[source]
source: obs_core::config::ConfigError,
},
#[error("build observer: {0}")]
Build(#[from] obs_core::observer::BuildError),
#[cfg(feature = "tracing-bridge")]
#[error("install tracing bridge: {0}")]
TracingBridge(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_defaults_are_sensible() {
let b = init_for_service("svc", "0.1.0");
assert_eq!(b.service, "svc");
assert_eq!(b.version, "0.1.0");
assert_eq!(b.preset, ServicePreset::Dev);
assert!(b.panic_hook);
assert_eq!(b.shutdown_budget, DEFAULT_SHUTDOWN_BUDGET);
assert!(b.sink_overrides.is_empty());
}
#[tokio::test]
async fn test_install_in_memory_preset_wires_in_memory_observer() {
let guard = init_for_service("svc", "0.1.0")
.preset(ServicePreset::InMemory)
.with_panic_hook(false)
.install()
.await
.expect("install");
drop(guard);
}
#[tokio::test]
async fn test_install_dev_preset_builds_without_config_path() {
let guard = init_for_service("svc", "0.1.0")
.preset(ServicePreset::Dev)
.with_panic_hook(false)
.install()
.await
.expect("install");
drop(guard);
}
#[tokio::test]
async fn test_install_returns_config_read_error_for_missing_path() {
let err = init_for_service("svc", "0.1.0")
.preset(ServicePreset::Dev)
.with_panic_hook(false)
.config_path("/definitely/does/not/exist.yaml")
.install()
.await
.expect_err("missing path must fail");
assert!(matches!(err, InitError::ConfigRead { .. }));
}
}