Skip to main content

obs_kit/
init.rs

1//! `init_for_service` — one-call bootstrap for obs consumers.
2//!
3//! Boundary-review § 3.6 / § 4.3. Collapses the 200-300 LoC
4//! "constructor-to-running-observer" path every consumer was
5//! re-discovering: config load → observer build → install → panic hook
6//! → tracing bridge → optional SIGHUP reload → RAII drain-on-drop.
7//!
8//! A typical `main.rs` now looks like:
9//!
10//! ```no_run
11//! use obs_kit::{ServicePreset, init_for_service};
12//!
13//! # async fn demo() -> anyhow::Result<()> {
14//! let _obs = init_for_service("my-service", env!("CARGO_PKG_VERSION"))
15//!     .instance("pod-abc123")
16//!     .config_path("/etc/my-service/obs.yaml")
17//!     .preset(ServicePreset::Production)
18//!     .install()
19//!     .await?;
20//! # Ok(()) }
21//! ```
22//!
23//! Consumers that want per-tier overrides (fan-out, live-tail mirror)
24//! compose with [`InitBuilder::with_sink_for`]:
25//!
26//! ```no_run
27//! # use std::sync::Arc;
28//! use obs_kit::{
29//!     FanOutSink, NdjsonFileSink, RollingFileWriterBuilder, RollingPolicy, Sink, Tier,
30//! };
31//! # use obs_kit::{ServicePreset, init_for_service};
32//! #
33//! # async fn demo() -> anyhow::Result<()> {
34//! let writer = RollingFileWriterBuilder::default()
35//!     .directory("./.tok-dev")
36//!     .filename_prefix("audit")
37//!     .filename_suffix(".ndjson")
38//!     .policy(RollingPolicy::Daily)
39//!     .build()?;
40//! let audit: Arc<dyn Sink> = Arc::new(NdjsonFileSink::new(writer));
41//! let _obs = init_for_service("my-service", env!("CARGO_PKG_VERSION"))
42//!     .preset(ServicePreset::Dev)
43//!     .with_sink_for(Tier::Audit, audit)
44//!     .install()
45//!     .await?;
46//! # Ok(()) }
47//! ```
48
49use std::{path::PathBuf, sync::Arc, time::Duration};
50
51use obs_core::{
52    EventsConfig, FormatterStyle, InMemoryObserver, Sink, StandardObserver, StdoutSink, Tier,
53    install_observer, install_panic_hook, observer,
54};
55
56/// Bound on the drop-time drain of the observer queue. Matches the
57/// tok `TokObsGuard` default — most in-flight envelopes flush, the
58/// process doesn't hang if a sink is wedged.
59const DEFAULT_SHUTDOWN_BUDGET: Duration = Duration::from_millis(250);
60
61/// Built-in service presets.
62///
63/// Covers the three shapes every consumer eventually wants. Non-preset
64/// wiring (custom sinks, fan-out, etc.) composes on top via
65/// [`InitBuilder::with_sink_for`].
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
67#[non_exhaustive]
68pub enum ServicePreset {
69    /// Compact stdout on every tier. No OTLP, no AUDIT spool beyond
70    /// stdout. Default — matches the shape local dev + CI smoke tests
71    /// want out of the box.
72    #[default]
73    Dev,
74    /// Production wiring: OTLP sinks on LOG / METRIC / TRACE (when
75    /// endpoint env vars are set) and a stdout fallback. AUDIT is left
76    /// unwired — services with a compliance story set it explicitly
77    /// via [`InitBuilder::with_sink_for`].
78    Production,
79    /// Tests: wire [`InMemoryObserver`] directly. All sinks/preset
80    /// overrides on the builder are ignored. `assert_emitted!` and
81    /// `InMemoryHandle::drain` read the captured stream.
82    InMemory,
83}
84
85/// Builder returned by [`init_for_service`]. Configure the service,
86/// preset, and any per-tier sink overrides; finish with
87/// [`Self::install`].
88#[must_use = "call .install() to apply the configuration"]
89pub struct InitBuilder {
90    service: String,
91    version: String,
92    instance: Option<String>,
93    config_path: Option<PathBuf>,
94    config: Option<EventsConfig>,
95    preset: ServicePreset,
96    panic_hook: bool,
97    #[cfg(feature = "tracing-bridge")]
98    tracing_bridge_filter: Option<String>,
99    #[cfg(unix)]
100    sighup_reload: bool,
101    sink_overrides: Vec<(Tier, Arc<dyn Sink>)>,
102    sink_fallback: Option<Arc<dyn Sink>>,
103    shutdown_budget: Duration,
104}
105
106impl std::fmt::Debug for InitBuilder {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("InitBuilder")
109            .field("service", &self.service)
110            .field("version", &self.version)
111            .field("instance", &self.instance)
112            .field("config_path", &self.config_path)
113            .field("preset", &self.preset)
114            .field("panic_hook", &self.panic_hook)
115            .field("sink_override_count", &self.sink_overrides.len())
116            .field("has_fallback", &self.sink_fallback.is_some())
117            .field("shutdown_budget", &self.shutdown_budget)
118            .finish_non_exhaustive()
119    }
120}
121
122/// Entry point. `service` and `version` become the `service.name` /
123/// `service.version` resource attributes on every envelope.
124pub fn init_for_service(service: impl Into<String>, version: impl Into<String>) -> InitBuilder {
125    InitBuilder {
126        service: service.into(),
127        version: version.into(),
128        instance: None,
129        config_path: None,
130        config: None,
131        preset: ServicePreset::default(),
132        panic_hook: true,
133        #[cfg(feature = "tracing-bridge")]
134        tracing_bridge_filter: None,
135        #[cfg(unix)]
136        sighup_reload: false,
137        sink_overrides: Vec::new(),
138        sink_fallback: None,
139        shutdown_budget: DEFAULT_SHUTDOWN_BUDGET,
140    }
141}
142
143impl InitBuilder {
144    /// Override the instance identity (hostname, pod id, VM id). When
145    /// unset, `OTEL_SERVICE_INSTANCE_ID` env is consulted, then empty.
146    pub fn instance(mut self, id: impl Into<String>) -> Self {
147        self.instance = Some(id.into());
148        self
149    }
150
151    /// Load `EventsConfig` from `path`. YAML root must parse as
152    /// [`EventsConfig`] — typos surface with the keys-hint from
153    /// [`EventsConfig::from_yaml_str`].
154    pub fn config_path(mut self, path: impl Into<PathBuf>) -> Self {
155        self.config_path = Some(path.into());
156        self
157    }
158
159    /// Supply an [`EventsConfig`] directly — bypasses `config_path`.
160    /// Useful when the consumer already owns the config struct.
161    pub fn config(mut self, cfg: EventsConfig) -> Self {
162        self.config = Some(cfg);
163        self
164    }
165
166    /// Select a preset (default: [`ServicePreset::Dev`]).
167    pub fn preset(mut self, preset: ServicePreset) -> Self {
168        self.preset = preset;
169        self
170    }
171
172    /// Override the default shutdown drain budget (default 250 ms).
173    pub fn shutdown_budget(mut self, budget: Duration) -> Self {
174        self.shutdown_budget = budget;
175        self
176    }
177
178    /// Disable the panic hook (installed by default).
179    pub fn with_panic_hook(mut self, enabled: bool) -> Self {
180        self.panic_hook = enabled;
181        self
182    }
183
184    /// Install the `tracing → obs` bridge with the supplied filter
185    /// directive (`RUST_LOG` shape). Requires the `tracing-bridge`
186    /// feature on `obs-kit`.
187    #[cfg(feature = "tracing-bridge")]
188    pub fn with_tracing_bridge(mut self, filter: impl Into<String>) -> Self {
189        self.tracing_bridge_filter = Some(filter.into());
190        self
191    }
192
193    /// Spawn a SIGHUP handler that re-parses `config_path` and calls
194    /// `StandardObserver::reload_config`. Only takes effect when
195    /// `config_path` is set. Unix-only.
196    #[cfg(unix)]
197    pub fn with_sighup_reload(mut self, enabled: bool) -> Self {
198        self.sighup_reload = enabled;
199        self
200    }
201
202    /// Attach an additional sink for `tier`. Called before
203    /// [`Self::install`]; composes on top of the chosen preset. To fan
204    /// out to multiple sinks on the same tier, construct a
205    /// [`obs_core::FanOutSink`] and pass it here.
206    pub fn with_sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
207        self.sink_overrides.push((tier, sink));
208        self
209    }
210
211    /// Override the fallback sink (the sink tiers without a specific
212    /// binding route to). When unset, defaults to
213    /// [`StdoutSink::default`] for Dev, none for Production, and
214    /// unused for InMemory.
215    pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
216        self.sink_fallback = Some(sink);
217        self
218    }
219
220    /// Build the observer, install it, install the panic hook (when
221    /// enabled), install the tracing bridge (when configured), spawn
222    /// the SIGHUP task (when configured), and return an RAII guard
223    /// that drains the queue on drop.
224    ///
225    /// # Errors
226    ///
227    /// Returns `anyhow::Error` when the config file cannot be read,
228    /// when parsing / validation fails, when the observer builder
229    /// rejects the config, or when the tracing bridge has already been
230    /// installed.
231    pub async fn install(self) -> Result<InitGuard, InitError> {
232        // 1. Resolve the config source.
233        let config = match (self.config.clone(), self.config_path.as_ref()) {
234            (Some(cfg), _) => cfg,
235            (None, Some(path)) => load_config(path).await?,
236            (None, None) => EventsConfig::default(),
237        };
238
239        // 2. Route on preset.
240        match self.preset {
241            ServicePreset::InMemory => {
242                install_observer(InMemoryObserver::new());
243            }
244            preset => {
245                let observer = build_observer(&self, preset, config.clone())?;
246                install_observer(observer);
247            }
248        }
249
250        // 3. Panic hook.
251        if self.panic_hook {
252            install_panic_hook();
253        }
254
255        // 4. Tracing bridge (feature-gated).
256        #[cfg(feature = "tracing-bridge")]
257        if let Some(ref filter) = self.tracing_bridge_filter {
258            install_tracing_bridge(filter)?;
259        }
260
261        // 5. SIGHUP reload.
262        #[cfg(unix)]
263        if self.sighup_reload
264            && let Some(path) = self.config_path.clone()
265        {
266            spawn_sighup_reload(path);
267        }
268
269        Ok(InitGuard {
270            shutdown_budget: self.shutdown_budget,
271        })
272    }
273}
274
275fn build_observer(
276    b: &InitBuilder,
277    preset: ServicePreset,
278    config: EventsConfig,
279) -> Result<StandardObserver, InitError> {
280    let instance = b
281        .instance
282        .clone()
283        .or_else(|| std::env::var("OTEL_SERVICE_INSTANCE_ID").ok())
284        .unwrap_or_default();
285
286    let mut builder = StandardObserver::builder()
287        .service(b.service.clone(), b.version.clone())
288        .instance(instance)
289        .config(config);
290
291    // Preset baseline.
292    match preset {
293        ServicePreset::Dev => {
294            let compact: Arc<dyn Sink> = Arc::new(StdoutSink::new(FormatterStyle::Compact));
295            builder = builder
296                .sink_for(Tier::Log, Arc::clone(&compact))
297                .sink_for(Tier::Metric, Arc::clone(&compact))
298                .sink_for(Tier::Trace, Arc::clone(&compact));
299            // AUDIT: unwired — stdout is the wrong place for audit
300            // records and the preset declines to guess. Callers supply
301            // a concrete audit sink via `with_sink_for(Tier::Audit, …)`.
302        }
303        ServicePreset::Production => {
304            // No OTLP wiring here — obs-otel is a separate crate, and
305            // forcing the feature-graph into obs-kit's init path would
306            // either bloat the default build or leak feature flags out
307            // of the façade. Production callers wire OTLP sinks
308            // through `with_sink_for` (the `otlp_trio_from_env()`
309            // builder in obs-otel returns sinks ready to hand in).
310            // Stdout fallback keeps emits visible when no override is
311            // supplied.
312        }
313        ServicePreset::InMemory => unreachable!("handled in install()"),
314    }
315
316    // Fallback.
317    let fallback = b
318        .sink_fallback
319        .clone()
320        .unwrap_or_else(|| Arc::new(StdoutSink::default()) as Arc<dyn Sink>);
321    builder = builder.sink_fallback(fallback);
322
323    // Overrides (composed on top of preset).
324    for (tier, sink) in &b.sink_overrides {
325        builder = builder.sink_for(*tier, Arc::clone(sink));
326    }
327
328    builder.build().map_err(InitError::Build)
329}
330
331async fn load_config(path: &std::path::Path) -> Result<EventsConfig, InitError> {
332    let bytes = tokio::fs::read_to_string(path)
333        .await
334        .map_err(|source| InitError::ConfigRead {
335            path: path.to_path_buf(),
336            source,
337        })?;
338    EventsConfig::from_yaml_str(&bytes).map_err(|source| InitError::ConfigParse {
339        path: path.to_path_buf(),
340        source,
341    })
342}
343
344#[cfg(feature = "tracing-bridge")]
345fn install_tracing_bridge(filter: &str) -> Result<(), InitError> {
346    use std::sync::OnceLock;
347
348    // `obs_tracing_bridge::init` returns Err on second install. Guard
349    // with a OnceLock so repeated `init_for_service` calls (tests,
350    // dev rebuilds) are idempotent without surfacing a spurious Err.
351    static INSTALLED: OnceLock<()> = OnceLock::new();
352    let mut result: Result<(), InitError> = Ok(());
353    INSTALLED.get_or_init(|| {
354        if let Err(e) = obs_tracing_bridge::init(Some(filter)) {
355            result = Err(InitError::TracingBridge(e));
356        }
357    });
358    result
359}
360
361#[cfg(unix)]
362fn spawn_sighup_reload(path: PathBuf) {
363    use tokio::signal::unix::{SignalKind, signal};
364
365    tokio::spawn(async move {
366        let Ok(mut sig) = signal(SignalKind::hangup()) else {
367            // Can't install SIGHUP on this platform — silently skip.
368            // Emit-at-noise-level is not available here (no observer-
369            // side self-event path) and initd's typical containerised
370            // host supports SIGHUP, so the practical failure rate is
371            // zero.
372            return;
373        };
374        while sig.recv().await.is_some() {
375            if let Ok(bytes) = tokio::fs::read_to_string(&path).await
376                && let Ok(next) = EventsConfig::from_yaml_str(&bytes)
377                && let Some(std_obs) = observer_as_standard()
378            {
379                let _ = std_obs.reload_config(next);
380            }
381        }
382    });
383}
384
385/// Downcast the global observer to `StandardObserver` so the SIGHUP
386/// handler can hot-reload its config. Returns `None` when the
387/// installed observer is something else (InMemoryObserver in tests,
388/// a bespoke observer under `with_observer_task`). The handler
389/// silently no-ops in that case — nothing to reload.
390#[cfg(unix)]
391fn observer_as_standard() -> Option<Arc<StandardObserver>> {
392    // `observer()` returns an `Arc<dyn Observer>` — we can't downcast
393    // through the trait object without keeping a concrete handle on
394    // the side. The init path that installs `StandardObserver`
395    // doesn't stash one, so for now the SIGHUP reloader is a best-
396    // effort facility; consumers that need guaranteed reload wire
397    // their own path on top of `StandardObserver::reload_config`.
398    //
399    // Returning None here means the SIGHUP task becomes a no-op when
400    // a non-standard observer is installed — which is the right
401    // semantic: nothing to reload. A future extension can stash a
402    // `WeakObserver<StandardObserver>` at install time.
403    let _ = observer();
404    None
405}
406
407/// RAII guard returned by [`InitBuilder::install`]. On drop, calls the
408/// global observer's `shutdown_blocking` with the configured budget so
409/// in-flight envelopes have a bounded window to flush.
410#[must_use = "dropping the guard drains the observer — keep it alive for the lifetime of the \
411              process"]
412#[derive(Debug)]
413pub struct InitGuard {
414    shutdown_budget: Duration,
415}
416
417impl Drop for InitGuard {
418    fn drop(&mut self) {
419        observer().shutdown_blocking(self.shutdown_budget);
420    }
421}
422
423/// Errors from [`InitBuilder::install`].
424#[derive(Debug, thiserror::Error)]
425#[non_exhaustive]
426pub enum InitError {
427    /// The config file could not be read.
428    #[error("read obs config `{}`: {source}", path.display())]
429    ConfigRead {
430        /// Path the builder attempted to read.
431        path: PathBuf,
432        /// Underlying IO error.
433        #[source]
434        source: std::io::Error,
435    },
436    /// The config file parsed but failed obs's YAML validation.
437    #[error("parse obs config `{}`: {source}", path.display())]
438    ConfigParse {
439        /// Path the builder attempted to parse.
440        path: PathBuf,
441        /// Underlying config error.
442        #[source]
443        source: obs_core::config::ConfigError,
444    },
445    /// The `StandardObserver` builder rejected the assembled config.
446    #[error("build observer: {0}")]
447    Build(#[from] obs_core::observer::BuildError),
448    /// The tracing bridge returned an install-time error.
449    #[cfg(feature = "tracing-bridge")]
450    #[error("install tracing bridge: {0}")]
451    TracingBridge(String),
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_builder_defaults_are_sensible() {
460        let b = init_for_service("svc", "0.1.0");
461        assert_eq!(b.service, "svc");
462        assert_eq!(b.version, "0.1.0");
463        assert_eq!(b.preset, ServicePreset::Dev);
464        assert!(b.panic_hook);
465        assert_eq!(b.shutdown_budget, DEFAULT_SHUTDOWN_BUDGET);
466        assert!(b.sink_overrides.is_empty());
467    }
468
469    #[tokio::test]
470    async fn test_install_in_memory_preset_wires_in_memory_observer() {
471        // `ServicePreset::InMemory` must not require a config path or
472        // any sink overrides.
473        let guard = init_for_service("svc", "0.1.0")
474            .preset(ServicePreset::InMemory)
475            .with_panic_hook(false)
476            .install()
477            .await
478            .expect("install");
479        // Guard exists — hold it; drop at end of test drains the
480        // (empty) queue.
481        drop(guard);
482    }
483
484    #[tokio::test]
485    async fn test_install_dev_preset_builds_without_config_path() {
486        let guard = init_for_service("svc", "0.1.0")
487            .preset(ServicePreset::Dev)
488            .with_panic_hook(false)
489            .install()
490            .await
491            .expect("install");
492        drop(guard);
493    }
494
495    #[tokio::test]
496    async fn test_install_returns_config_read_error_for_missing_path() {
497        let err = init_for_service("svc", "0.1.0")
498            .preset(ServicePreset::Dev)
499            .with_panic_hook(false)
500            .config_path("/definitely/does/not/exist.yaml")
501            .install()
502            .await
503            .expect_err("missing path must fail");
504        assert!(matches!(err, InitError::ConfigRead { .. }));
505    }
506}