obs_kit/init.rs
1//! `init_for_service` — one-call bootstrap for obs consumers.
2//!
3//! Boundary-review § 3.6 / § 4.3. Collapses the 200-300 LoC
4//! "constructor-to-running-observer" path every consumer was
5//! re-discovering: config load → observer build → install → panic hook
6//! → tracing bridge → optional SIGHUP reload → RAII drain-on-drop.
7//!
8//! A typical `main.rs` now looks like:
9//!
10//! ```no_run
11//! use obs_kit::{ServicePreset, init_for_service};
12//!
13//! # async fn demo() -> anyhow::Result<()> {
14//! let _obs = init_for_service("my-service", env!("CARGO_PKG_VERSION"))
15//! .instance("pod-abc123")
16//! .config_path("/etc/my-service/obs.yaml")
17//! .preset(ServicePreset::Production)
18//! .install()
19//! .await?;
20//! # Ok(()) }
21//! ```
22//!
23//! Consumers that want per-tier overrides (fan-out, live-tail mirror)
24//! compose with [`InitBuilder::with_sink_for`]:
25//!
26//! ```no_run
27//! # use std::sync::Arc;
28//! use obs_kit::{
29//! FanOutSink, NdjsonFileSink, RollingFileWriterBuilder, RollingPolicy, Sink, Tier,
30//! };
31//! # use obs_kit::{ServicePreset, init_for_service};
32//! #
33//! # async fn demo() -> anyhow::Result<()> {
34//! let writer = RollingFileWriterBuilder::default()
35//! .directory("./.tok-dev")
36//! .filename_prefix("audit")
37//! .filename_suffix(".ndjson")
38//! .policy(RollingPolicy::Daily)
39//! .build()?;
40//! let audit: Arc<dyn Sink> = Arc::new(NdjsonFileSink::new(writer));
41//! let _obs = init_for_service("my-service", env!("CARGO_PKG_VERSION"))
42//! .preset(ServicePreset::Dev)
43//! .with_sink_for(Tier::Audit, audit)
44//! .install()
45//! .await?;
46//! # Ok(()) }
47//! ```
48
49use std::{path::PathBuf, sync::Arc, time::Duration};
50
51use obs_core::{
52 EventsConfig, FormatterStyle, InMemoryObserver, Sink, StandardObserver, StdoutSink, Tier,
53 install_observer, install_panic_hook, observer,
54};
55
56/// Bound on the drop-time drain of the observer queue. Matches the
57/// tok `TokObsGuard` default — most in-flight envelopes flush, the
58/// process doesn't hang if a sink is wedged.
59const DEFAULT_SHUTDOWN_BUDGET: Duration = Duration::from_millis(250);
60
61/// Built-in service presets.
62///
63/// Covers the three shapes every consumer eventually wants. Non-preset
64/// wiring (custom sinks, fan-out, etc.) composes on top via
65/// [`InitBuilder::with_sink_for`].
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
67#[non_exhaustive]
68pub enum ServicePreset {
69 /// Compact stdout on every tier. No OTLP, no AUDIT spool beyond
70 /// stdout. Default — matches the shape local dev + CI smoke tests
71 /// want out of the box.
72 #[default]
73 Dev,
74 /// Production wiring: OTLP sinks on LOG / METRIC / TRACE (when
75 /// endpoint env vars are set) and a stdout fallback. AUDIT is left
76 /// unwired — services with a compliance story set it explicitly
77 /// via [`InitBuilder::with_sink_for`].
78 Production,
79 /// Tests: wire [`InMemoryObserver`] directly. All sinks/preset
80 /// overrides on the builder are ignored. `assert_emitted!` and
81 /// `InMemoryHandle::drain` read the captured stream.
82 InMemory,
83}
84
85/// Builder returned by [`init_for_service`]. Configure the service,
86/// preset, and any per-tier sink overrides; finish with
87/// [`Self::install`].
88#[must_use = "call .install() to apply the configuration"]
89pub struct InitBuilder {
90 service: String,
91 version: String,
92 instance: Option<String>,
93 config_path: Option<PathBuf>,
94 config: Option<EventsConfig>,
95 preset: ServicePreset,
96 panic_hook: bool,
97 #[cfg(feature = "tracing-bridge")]
98 tracing_bridge_filter: Option<String>,
99 #[cfg(unix)]
100 sighup_reload: bool,
101 sink_overrides: Vec<(Tier, Arc<dyn Sink>)>,
102 sink_fallback: Option<Arc<dyn Sink>>,
103 shutdown_budget: Duration,
104}
105
106impl std::fmt::Debug for InitBuilder {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("InitBuilder")
109 .field("service", &self.service)
110 .field("version", &self.version)
111 .field("instance", &self.instance)
112 .field("config_path", &self.config_path)
113 .field("preset", &self.preset)
114 .field("panic_hook", &self.panic_hook)
115 .field("sink_override_count", &self.sink_overrides.len())
116 .field("has_fallback", &self.sink_fallback.is_some())
117 .field("shutdown_budget", &self.shutdown_budget)
118 .finish_non_exhaustive()
119 }
120}
121
122/// Entry point. `service` and `version` become the `service.name` /
123/// `service.version` resource attributes on every envelope.
124pub fn init_for_service(service: impl Into<String>, version: impl Into<String>) -> InitBuilder {
125 InitBuilder {
126 service: service.into(),
127 version: version.into(),
128 instance: None,
129 config_path: None,
130 config: None,
131 preset: ServicePreset::default(),
132 panic_hook: true,
133 #[cfg(feature = "tracing-bridge")]
134 tracing_bridge_filter: None,
135 #[cfg(unix)]
136 sighup_reload: false,
137 sink_overrides: Vec::new(),
138 sink_fallback: None,
139 shutdown_budget: DEFAULT_SHUTDOWN_BUDGET,
140 }
141}
142
143impl InitBuilder {
144 /// Override the instance identity (hostname, pod id, VM id). When
145 /// unset, `OTEL_SERVICE_INSTANCE_ID` env is consulted, then empty.
146 pub fn instance(mut self, id: impl Into<String>) -> Self {
147 self.instance = Some(id.into());
148 self
149 }
150
151 /// Load `EventsConfig` from `path`. YAML root must parse as
152 /// [`EventsConfig`] — typos surface with the keys-hint from
153 /// [`EventsConfig::from_yaml_str`].
154 pub fn config_path(mut self, path: impl Into<PathBuf>) -> Self {
155 self.config_path = Some(path.into());
156 self
157 }
158
159 /// Supply an [`EventsConfig`] directly — bypasses `config_path`.
160 /// Useful when the consumer already owns the config struct.
161 pub fn config(mut self, cfg: EventsConfig) -> Self {
162 self.config = Some(cfg);
163 self
164 }
165
166 /// Select a preset (default: [`ServicePreset::Dev`]).
167 pub fn preset(mut self, preset: ServicePreset) -> Self {
168 self.preset = preset;
169 self
170 }
171
172 /// Override the default shutdown drain budget (default 250 ms).
173 pub fn shutdown_budget(mut self, budget: Duration) -> Self {
174 self.shutdown_budget = budget;
175 self
176 }
177
178 /// Disable the panic hook (installed by default).
179 pub fn with_panic_hook(mut self, enabled: bool) -> Self {
180 self.panic_hook = enabled;
181 self
182 }
183
184 /// Install the `tracing → obs` bridge with the supplied filter
185 /// directive (`RUST_LOG` shape). Requires the `tracing-bridge`
186 /// feature on `obs-kit`.
187 #[cfg(feature = "tracing-bridge")]
188 pub fn with_tracing_bridge(mut self, filter: impl Into<String>) -> Self {
189 self.tracing_bridge_filter = Some(filter.into());
190 self
191 }
192
193 /// Spawn a SIGHUP handler that re-parses `config_path` and calls
194 /// `StandardObserver::reload_config`. Only takes effect when
195 /// `config_path` is set. Unix-only.
196 #[cfg(unix)]
197 pub fn with_sighup_reload(mut self, enabled: bool) -> Self {
198 self.sighup_reload = enabled;
199 self
200 }
201
202 /// Attach an additional sink for `tier`. Called before
203 /// [`Self::install`]; composes on top of the chosen preset. To fan
204 /// out to multiple sinks on the same tier, construct a
205 /// [`obs_core::FanOutSink`] and pass it here.
206 pub fn with_sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
207 self.sink_overrides.push((tier, sink));
208 self
209 }
210
211 /// Override the fallback sink (the sink tiers without a specific
212 /// binding route to). When unset, defaults to
213 /// [`StdoutSink::default`] for Dev, none for Production, and
214 /// unused for InMemory.
215 pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
216 self.sink_fallback = Some(sink);
217 self
218 }
219
220 /// Build the observer, install it, install the panic hook (when
221 /// enabled), install the tracing bridge (when configured), spawn
222 /// the SIGHUP task (when configured), and return an RAII guard
223 /// that drains the queue on drop.
224 ///
225 /// # Errors
226 ///
227 /// Returns `anyhow::Error` when the config file cannot be read,
228 /// when parsing / validation fails, when the observer builder
229 /// rejects the config, or when the tracing bridge has already been
230 /// installed.
231 pub async fn install(self) -> Result<InitGuard, InitError> {
232 // 1. Resolve the config source.
233 let config = match (self.config.clone(), self.config_path.as_ref()) {
234 (Some(cfg), _) => cfg,
235 (None, Some(path)) => load_config(path).await?,
236 (None, None) => EventsConfig::default(),
237 };
238
239 // 2. Route on preset.
240 match self.preset {
241 ServicePreset::InMemory => {
242 install_observer(InMemoryObserver::new());
243 }
244 preset => {
245 let observer = build_observer(&self, preset, config.clone())?;
246 install_observer(observer);
247 }
248 }
249
250 // 3. Panic hook.
251 if self.panic_hook {
252 install_panic_hook();
253 }
254
255 // 4. Tracing bridge (feature-gated).
256 #[cfg(feature = "tracing-bridge")]
257 if let Some(ref filter) = self.tracing_bridge_filter {
258 install_tracing_bridge(filter)?;
259 }
260
261 // 5. SIGHUP reload.
262 #[cfg(unix)]
263 if self.sighup_reload
264 && let Some(path) = self.config_path.clone()
265 {
266 spawn_sighup_reload(path);
267 }
268
269 Ok(InitGuard {
270 shutdown_budget: self.shutdown_budget,
271 })
272 }
273}
274
275fn build_observer(
276 b: &InitBuilder,
277 preset: ServicePreset,
278 config: EventsConfig,
279) -> Result<StandardObserver, InitError> {
280 let instance = b
281 .instance
282 .clone()
283 .or_else(|| std::env::var("OTEL_SERVICE_INSTANCE_ID").ok())
284 .unwrap_or_default();
285
286 let mut builder = StandardObserver::builder()
287 .service(b.service.clone(), b.version.clone())
288 .instance(instance)
289 .config(config);
290
291 // Preset baseline.
292 match preset {
293 ServicePreset::Dev => {
294 let compact: Arc<dyn Sink> = Arc::new(StdoutSink::new(FormatterStyle::Compact));
295 builder = builder
296 .sink_for(Tier::Log, Arc::clone(&compact))
297 .sink_for(Tier::Metric, Arc::clone(&compact))
298 .sink_for(Tier::Trace, Arc::clone(&compact));
299 // AUDIT: unwired — stdout is the wrong place for audit
300 // records and the preset declines to guess. Callers supply
301 // a concrete audit sink via `with_sink_for(Tier::Audit, …)`.
302 }
303 ServicePreset::Production => {
304 // No OTLP wiring here — obs-otel is a separate crate, and
305 // forcing the feature-graph into obs-kit's init path would
306 // either bloat the default build or leak feature flags out
307 // of the façade. Production callers wire OTLP sinks
308 // through `with_sink_for` (the `otlp_trio_from_env()`
309 // builder in obs-otel returns sinks ready to hand in).
310 // Stdout fallback keeps emits visible when no override is
311 // supplied.
312 }
313 ServicePreset::InMemory => unreachable!("handled in install()"),
314 }
315
316 // Fallback.
317 let fallback = b
318 .sink_fallback
319 .clone()
320 .unwrap_or_else(|| Arc::new(StdoutSink::default()) as Arc<dyn Sink>);
321 builder = builder.sink_fallback(fallback);
322
323 // Overrides (composed on top of preset).
324 for (tier, sink) in &b.sink_overrides {
325 builder = builder.sink_for(*tier, Arc::clone(sink));
326 }
327
328 builder.build().map_err(InitError::Build)
329}
330
331async fn load_config(path: &std::path::Path) -> Result<EventsConfig, InitError> {
332 let bytes = tokio::fs::read_to_string(path)
333 .await
334 .map_err(|source| InitError::ConfigRead {
335 path: path.to_path_buf(),
336 source,
337 })?;
338 EventsConfig::from_yaml_str(&bytes).map_err(|source| InitError::ConfigParse {
339 path: path.to_path_buf(),
340 source,
341 })
342}
343
344#[cfg(feature = "tracing-bridge")]
345fn install_tracing_bridge(filter: &str) -> Result<(), InitError> {
346 use std::sync::OnceLock;
347
348 // `obs_tracing_bridge::init` returns Err on second install. Guard
349 // with a OnceLock so repeated `init_for_service` calls (tests,
350 // dev rebuilds) are idempotent without surfacing a spurious Err.
351 static INSTALLED: OnceLock<()> = OnceLock::new();
352 let mut result: Result<(), InitError> = Ok(());
353 INSTALLED.get_or_init(|| {
354 if let Err(e) = obs_tracing_bridge::init(Some(filter)) {
355 result = Err(InitError::TracingBridge(e));
356 }
357 });
358 result
359}
360
361#[cfg(unix)]
362fn spawn_sighup_reload(path: PathBuf) {
363 use tokio::signal::unix::{SignalKind, signal};
364
365 tokio::spawn(async move {
366 let Ok(mut sig) = signal(SignalKind::hangup()) else {
367 // Can't install SIGHUP on this platform — silently skip.
368 // Emit-at-noise-level is not available here (no observer-
369 // side self-event path) and initd's typical containerised
370 // host supports SIGHUP, so the practical failure rate is
371 // zero.
372 return;
373 };
374 while sig.recv().await.is_some() {
375 if let Ok(bytes) = tokio::fs::read_to_string(&path).await
376 && let Ok(next) = EventsConfig::from_yaml_str(&bytes)
377 && let Some(std_obs) = observer_as_standard()
378 {
379 let _ = std_obs.reload_config(next);
380 }
381 }
382 });
383}
384
385/// Downcast the global observer to `StandardObserver` so the SIGHUP
386/// handler can hot-reload its config. Returns `None` when the
387/// installed observer is something else (InMemoryObserver in tests,
388/// a bespoke observer under `with_observer_task`). The handler
389/// silently no-ops in that case — nothing to reload.
390#[cfg(unix)]
391fn observer_as_standard() -> Option<Arc<StandardObserver>> {
392 // `observer()` returns an `Arc<dyn Observer>` — we can't downcast
393 // through the trait object without keeping a concrete handle on
394 // the side. The init path that installs `StandardObserver`
395 // doesn't stash one, so for now the SIGHUP reloader is a best-
396 // effort facility; consumers that need guaranteed reload wire
397 // their own path on top of `StandardObserver::reload_config`.
398 //
399 // Returning None here means the SIGHUP task becomes a no-op when
400 // a non-standard observer is installed — which is the right
401 // semantic: nothing to reload. A future extension can stash a
402 // `WeakObserver<StandardObserver>` at install time.
403 let _ = observer();
404 None
405}
406
407/// RAII guard returned by [`InitBuilder::install`]. On drop, calls the
408/// global observer's `shutdown_blocking` with the configured budget so
409/// in-flight envelopes have a bounded window to flush.
410#[must_use = "dropping the guard drains the observer — keep it alive for the lifetime of the \
411 process"]
412#[derive(Debug)]
413pub struct InitGuard {
414 shutdown_budget: Duration,
415}
416
417impl Drop for InitGuard {
418 fn drop(&mut self) {
419 observer().shutdown_blocking(self.shutdown_budget);
420 }
421}
422
423/// Errors from [`InitBuilder::install`].
424#[derive(Debug, thiserror::Error)]
425#[non_exhaustive]
426pub enum InitError {
427 /// The config file could not be read.
428 #[error("read obs config `{}`: {source}", path.display())]
429 ConfigRead {
430 /// Path the builder attempted to read.
431 path: PathBuf,
432 /// Underlying IO error.
433 #[source]
434 source: std::io::Error,
435 },
436 /// The config file parsed but failed obs's YAML validation.
437 #[error("parse obs config `{}`: {source}", path.display())]
438 ConfigParse {
439 /// Path the builder attempted to parse.
440 path: PathBuf,
441 /// Underlying config error.
442 #[source]
443 source: obs_core::config::ConfigError,
444 },
445 /// The `StandardObserver` builder rejected the assembled config.
446 #[error("build observer: {0}")]
447 Build(#[from] obs_core::observer::BuildError),
448 /// The tracing bridge returned an install-time error.
449 #[cfg(feature = "tracing-bridge")]
450 #[error("install tracing bridge: {0}")]
451 TracingBridge(String),
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_builder_defaults_are_sensible() {
460 let b = init_for_service("svc", "0.1.0");
461 assert_eq!(b.service, "svc");
462 assert_eq!(b.version, "0.1.0");
463 assert_eq!(b.preset, ServicePreset::Dev);
464 assert!(b.panic_hook);
465 assert_eq!(b.shutdown_budget, DEFAULT_SHUTDOWN_BUDGET);
466 assert!(b.sink_overrides.is_empty());
467 }
468
469 #[tokio::test]
470 async fn test_install_in_memory_preset_wires_in_memory_observer() {
471 // `ServicePreset::InMemory` must not require a config path or
472 // any sink overrides.
473 let guard = init_for_service("svc", "0.1.0")
474 .preset(ServicePreset::InMemory)
475 .with_panic_hook(false)
476 .install()
477 .await
478 .expect("install");
479 // Guard exists — hold it; drop at end of test drains the
480 // (empty) queue.
481 drop(guard);
482 }
483
484 #[tokio::test]
485 async fn test_install_dev_preset_builds_without_config_path() {
486 let guard = init_for_service("svc", "0.1.0")
487 .preset(ServicePreset::Dev)
488 .with_panic_hook(false)
489 .install()
490 .await
491 .expect("install");
492 drop(guard);
493 }
494
495 #[tokio::test]
496 async fn test_install_returns_config_read_error_for_missing_path() {
497 let err = init_for_service("svc", "0.1.0")
498 .preset(ServicePreset::Dev)
499 .with_panic_hook(false)
500 .config_path("/definitely/does/not/exist.yaml")
501 .install()
502 .await
503 .expect_err("missing path must fail");
504 assert!(matches!(err, InitError::ConfigRead { .. }));
505 }
506}