Skip to main content

obs_core/observer/
standard.rs

1//! `StandardObserver` — production-ready observer with per-tier
2//! workers, AUDIT spool, head sampler, scope auto-fill, and live
3//! config reload. Spec 11 §§ 3, 4, 6.4 + spec 13 §§ 2, 6.
4
5use std::sync::{
6    Arc,
7    atomic::{AtomicU32, Ordering},
8};
9
10use arc_swap::ArcSwap;
11use bytes;
12use obs_proto::obs::v1::{ObsEnvelope, SamplingReason as PSamplingReason, Tier};
13use parking_lot::Mutex;
14
15use super::{
16    Observer,
17    workers::{TierWorker, WorkerCounters, note_channel_full, spawn_tier_worker},
18};
19use crate::{
20    audit_spool::SpoolWriter,
21    callsite::ObsCallsite,
22    config::{AuditFailureMode, EventsConfig},
23    filter::Filter,
24    registry::{ObsCallsiteRegistry, SchemaRegistry, ScrubbedEnvelope},
25    resource::ResourceAttrs,
26    sampling::{SamplingDecision, decide as sample_decide},
27    scope::{auto_fill_envelope, inbound_traceparent_sampled, push_tail_buffer},
28    sink::{NoopSink, Sink, SinkFut, StdoutSink},
29};
30
31/// Tier-matching dispatcher. One sink slot per tier plus a fallback.
32#[derive(Default)]
33struct SinkRouter {
34    log: Option<Arc<dyn Sink>>,
35    metric: Option<Arc<dyn Sink>>,
36    trace: Option<Arc<dyn Sink>>,
37    audit: Option<Arc<dyn Sink>>,
38    fallback: Option<Arc<dyn Sink>>,
39}
40
41impl std::fmt::Debug for SinkRouter {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("SinkRouter")
44            .field("log", &self.log.as_ref().map(|_| "..."))
45            .field("metric", &self.metric.as_ref().map(|_| "..."))
46            .field("trace", &self.trace.as_ref().map(|_| "..."))
47            .field("audit", &self.audit.as_ref().map(|_| "..."))
48            .field("fallback", &self.fallback.as_ref().map(|_| "..."))
49            .finish()
50    }
51}
52
53impl SinkRouter {
54    fn for_tier(&self, tier: Tier) -> Option<&Arc<dyn Sink>> {
55        let primary = match tier {
56            Tier::Log => self.log.as_ref(),
57            Tier::Metric => self.metric.as_ref(),
58            Tier::Trace => self.trace.as_ref(),
59            Tier::Audit => self.audit.as_ref(),
60            _ => None,
61        };
62        primary.or(self.fallback.as_ref())
63    }
64}
65
66/// Worker handles, indexed by tier; AUDIT is special (spool path).
67#[derive(Debug, Default)]
68struct WorkerPool {
69    log: Option<TierWorker>,
70    metric: Option<TierWorker>,
71    trace: Option<TierWorker>,
72    audit: Option<TierWorker>,
73}
74
75/// Production-ready observer with reloadable config and per-tier
76/// worker pool.
77pub struct StandardObserver {
78    router: SinkRouter,
79    workers: WorkerPool,
80    spool: Option<Arc<SpoolWriter>>,
81    registry: Arc<SchemaRegistry>,
82    callsites: Arc<ObsCallsiteRegistry>,
83    config: ArcSwap<EventsConfig>,
84    filter: ArcSwap<Filter>,
85    /// Workspace-shared OTel resource attribute set; sinks read the
86    /// snapshot at flush time. Spec 20 § 2.1 / spec 94 § 2.7 / P1-E.
87    resource: ArcSwap<ResourceAttrs>,
88    counters: Arc<WorkerCounters>,
89    generation: AtomicU32,
90    service: String,
91    instance: String,
92    version: String,
93    /// Synchronous fallback for environments without a tokio runtime
94    /// (tests, CLI tools): protects in-thread sink dispatch.
95    sync_dispatch_lock: Mutex<()>,
96}
97
98impl std::fmt::Debug for StandardObserver {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("StandardObserver")
101            .field("schemas", &self.registry.len())
102            .field("service", &self.service)
103            .field("instance", &self.instance)
104            .field("version", &self.version)
105            .field("generation", &self.generation.load(Ordering::Relaxed))
106            .finish_non_exhaustive()
107    }
108}
109
110impl StandardObserver {
111    /// Builder entry.
112    #[must_use]
113    pub fn builder() -> StandardObserverBuilder {
114        StandardObserverBuilder::default()
115    }
116
117    /// Convenience: `StandardObserver` with `StdoutSink(Full)` as
118    /// fallback.
119    ///
120    /// # Errors
121    ///
122    /// Returns `BuildError` if config validation fails.
123    pub fn dev() -> Result<Self, BuildError> {
124        Self::builder()
125            .service("dev", env!("CARGO_PKG_VERSION"))
126            .sink_fallback(Arc::new(StdoutSink::default()))
127            .build()
128    }
129
130    /// Read-only access to the registry (used by sinks).
131    #[must_use]
132    pub fn registry(&self) -> Arc<SchemaRegistry> {
133        Arc::clone(&self.registry)
134    }
135
136    /// Read-only access to the per-process callsite registry. Used by
137    /// the bridge (Direction A inserts; Direction B reads to
138    /// reconstitute `tracing::Metadata`). Spec 31 § 3.2.
139    #[must_use]
140    pub fn callsites(&self) -> Arc<ObsCallsiteRegistry> {
141        Arc::clone(&self.callsites)
142    }
143
144    /// Read-only access to the live config.
145    #[must_use]
146    pub fn config(&self) -> arc_swap::Guard<Arc<EventsConfig>> {
147        self.config.load()
148    }
149
150    /// Atomically swap the resource attribute set. Sinks pick up the
151    /// new value on the next [`Observer::resource_attrs`] call.
152    /// Spec 20 § 2.1 / spec 94 § 2.7 / P1-E.
153    pub fn set_resource_attrs(&self, attrs: ResourceAttrs) {
154        self.resource.store(Arc::new(attrs));
155    }
156
157    /// Atomically swap the config and bump the generation so all
158    /// callsites re-probe. Spec 11 § 3.2.
159    ///
160    /// # Errors
161    ///
162    /// Returns `BuildError::InvalidConfig` if validation rejects
163    /// `new_config`.
164    pub fn reload_config(&self, new_config: EventsConfig) -> Result<(), BuildError> {
165        if let Err(e) = new_config.validate() {
166            crate::self_events::emit_config_reload_failed(&format!("validate: {e}"));
167            return Err(BuildError::InvalidConfig(e));
168        }
169        if let Some(spec) = new_config.filter.as_deref() {
170            match Filter::parse(spec) {
171                Ok(parsed) => self.filter.store(Arc::new(parsed)),
172                Err(e) => {
173                    crate::self_events::emit_config_reload_failed(&format!("filter: {e}"));
174                    return Err(BuildError::InvalidConfig(
175                        crate::config::ConfigError::invalid_range("filter", format!("{e}")),
176                    ));
177                }
178            }
179        } else {
180            self.filter.store(Arc::new(Filter::new()));
181        }
182        // Cheap rolling hash so operators can correlate
183        // `ObsConfigReloaded` events with config-file generations
184        // without leaking field values into labels.
185        let cfg_hash = config_hash(&new_config);
186        self.config.store(Arc::new(new_config));
187        self.generation.fetch_add(1, Ordering::Release);
188        crate::self_events::emit_config_reloaded(cfg_hash);
189        Ok(())
190    }
191
192    /// Read-only access to the live filter.
193    #[must_use]
194    pub fn filter(&self) -> Arc<Filter> {
195        self.filter.load_full()
196    }
197
198    /// Worker counters surface for tests + diagnostics.
199    #[must_use]
200    pub fn counters(&self) -> Arc<WorkerCounters> {
201        Arc::clone(&self.counters)
202    }
203
204    fn fill_identity(&self, env: &mut ObsEnvelope) {
205        if env.service.is_empty() {
206            env.service.clone_from(&self.service);
207        }
208        if env.instance.is_empty() {
209            env.instance.clone_from(&self.instance);
210        }
211        if env.version.is_empty() {
212            env.version.clone_from(&self.version);
213        }
214    }
215
216    fn dispatch_sync(&self, env: ObsEnvelope, tier: Tier) {
217        // No tokio runtime ⇒ deliver in-emit-thread. The spec's
218        // "scrubber on the worker thread" rule (spec 11 § 4.1) still
219        // applies: sinks must never see an unscrubbed envelope, so we
220        // run the scrubber here and then dispatch.
221        let _g = self.sync_dispatch_lock.lock();
222        let Some(sink) = self.router.for_tier(tier) else {
223            return;
224        };
225        let mut scratch = bytes::BytesMut::with_capacity(env.payload.len());
226        let scrubbed = match ScrubbedEnvelope::scrub(&env, &self.registry, &mut scratch) {
227            Ok(s) => s,
228            Err(_) => return,
229        };
230        sink.deliver(scrubbed);
231    }
232
233    fn dispatch_async(&self, env: ObsEnvelope, tier: Tier) {
234        let worker = match tier {
235            Tier::Log => self.workers.log.as_ref(),
236            Tier::Metric => self.workers.metric.as_ref(),
237            Tier::Trace => self.workers.trace.as_ref(),
238            Tier::Audit => self.workers.audit.as_ref(),
239            _ => None,
240        };
241        let Some(worker) = worker else {
242            // No worker (no tokio runtime) — fall back to sync.
243            self.dispatch_sync(env, tier);
244            return;
245        };
246        if tier == Tier::Audit {
247            self.dispatch_audit(worker, env);
248        } else {
249            match worker.try_send(env) {
250                Ok(()) => {}
251                Err(_dropped) => {
252                    note_channel_full(&self.counters, tier);
253                }
254            }
255        }
256    }
257
258    fn dispatch_audit(&self, worker: &TierWorker, env: ObsEnvelope) {
259        let cfg = self.config.load();
260        let block_ms = u64::from(cfg.audit.block_ms_max);
261        // First try a non-blocking send; if it succeeds, we're done.
262        let mut env_unsent = match worker.try_send(env) {
263            Ok(()) => return,
264            Err(env) => env,
265        };
266        // Fall back to bounded busy-wait with the configured timeout.
267        // We deliberately do NOT call `Handle::block_on` here — that
268        // panics when the caller is already inside a runtime. Instead,
269        // poll `try_send` with a short sleep to honour the documented
270        // "bounded blocking" semantics (spec 11 § 6.4) without relying
271        // on `block_on`. The total wall-clock blocking is bounded by
272        // `audit.block_ms_max`.
273        let started = std::time::Instant::now();
274        let interval = std::time::Duration::from_millis(2);
275        while started.elapsed().as_millis() < u128::from(block_ms) {
276            match worker.try_send(env_unsent) {
277                Ok(()) => return,
278                Err(env) => env_unsent = env,
279            }
280            std::thread::sleep(interval);
281        }
282        // Channel still full ⇒ spool to disk.
283        if let Some(spool) = self.spool.as_ref() {
284            match spool.append(&env_unsent) {
285                Ok(()) => {
286                    note_channel_full(&self.counters, Tier::Audit);
287                    crate::self_events::emit_audit_spooled(env_unsent.full_name.as_str());
288                }
289                Err(e) => {
290                    crate::self_events::emit_audit_spool_failed(&e.to_string());
291                    self.handle_spool_failure();
292                }
293            }
294        } else {
295            crate::self_events::emit_audit_spool_failed("no spool configured");
296            self.handle_spool_failure();
297        }
298    }
299
300    fn handle_spool_failure(&self) {
301        // The AUDIT-tier failure path is documented as a deliberate
302        // policy escalation (spec 11 § 6.4); the choice between panic /
303        // abort / warn_only is taken from `audit.on_failure`. Allow
304        // `clippy::panic` here because the panic is the documented
305        // escape hatch when the operator picked `Panic` mode.
306        #[allow(clippy::panic)]
307        {
308            let cfg = self.config.load();
309            match cfg.audit.on_failure {
310                AuditFailureMode::Panic => {
311                    panic!("audit spool unwritable; compliance failure")
312                }
313                AuditFailureMode::Abort => std::process::abort(),
314                AuditFailureMode::WarnOnly => {
315                    eprintln!("[obs] AUDIT spool unwritable; envelope dropped (warn_only)");
316                }
317            }
318        }
319    }
320
321    /// Drain any `*.audit.bin` files left in `audit.spool_dir` by a
322    /// prior process. Recovered envelopes are dispatched through the
323    /// AUDIT worker (or the sync fallback if no runtime is alive). One
324    /// `ObsAuditSpoolRecovered` self-event is emitted at the end with
325    /// the total count. Spec 11 § 6.4.
326    fn recover_audit_spool(&self) {
327        let cfg = self.config.load();
328        let dir = cfg.audit.spool_dir.clone();
329        if !dir.exists() {
330            return;
331        }
332        let mut total: u64 = 0;
333        let report = crate::audit_spool::recover(&dir, |env| {
334            total += 1;
335            // Re-enqueue: sync if no worker, async otherwise.
336            if let Some(worker) = self.workers.audit.as_ref() {
337                let _ = worker.try_send(env);
338            } else {
339                self.dispatch_sync(env, Tier::Audit);
340            }
341            Ok(())
342        });
343        if total == 0 {
344            let _ = report;
345            return;
346        }
347        let mut env = ObsEnvelope {
348            full_name: "obs.runtime.v1.ObsAuditSpoolRecovered".to_string(),
349            tier: ::buffa::EnumValue::Known(obs_proto::obs::v1::Tier::TIER_LOG),
350            sev: ::buffa::EnumValue::Known(obs_proto::obs::v1::Severity::SEVERITY_INFO),
351            ..Default::default()
352        };
353        env.labels
354            .insert("record_count".to_string(), total.to_string());
355        // Route directly through this observer (the global may not be
356        // installed yet at builder-time).
357        self.fill_identity(&mut env);
358        self.dispatch_sync(env, Tier::Log);
359    }
360
361    /// Apply scope auto-fill, head sampling, and tail-buffer push to
362    /// `env`. Returns `true` when the envelope should continue down
363    /// the per-tier worker; `false` when it was dropped or counted as
364    /// a buffer push.
365    fn run_emit_pipeline(&self, env: &mut ObsEnvelope, sev: obs_proto::obs::v1::Severity) -> bool {
366        // Step 3 (post-project): auto-fill from scope frame stack.
367        auto_fill_envelope(env);
368        // Step 3.0: enforce `limits.max_payload_bytes` (spec 11 § 6.2 /
369        // spec 93 P2-10). An oversized envelope is dropped and a
370        // `ObsOversizedDropped` self-event records the drop with a
371        // `full_name` label so operators can find the noisy schema.
372        let cfg_pre = self.config.load();
373        let max_bytes = u64::from(cfg_pre.limits.max_payload_bytes);
374        let payload_size = env.payload.len() as u64;
375        if max_bytes > 0 && payload_size > max_bytes {
376            crate::self_events::emit_oversized_dropped(env.full_name.as_str(), payload_size);
377            return false;
378        }
379        // Step 3.1 (spec 11 § 6.2 / spec 94 § 3.5 / P2-E): enforce
380        // `limits.max_label_value_bytes`. A single oversize label value
381        // is treated as DoS-class input — User-Agent floods and
382        // attacker-controlled headers were the motivation. The
383        // envelope is dropped with `reason = "label"` and the offending
384        // label key is recorded in `size_bytes` so operators can grep
385        // by `(full_name, label_name, size)`.
386        let max_label_bytes = u64::from(cfg_pre.limits.max_label_value_bytes);
387        if max_label_bytes > 0 {
388            for (k, v) in &env.labels {
389                if v.len() as u64 > max_label_bytes {
390                    crate::self_events::emit_oversized_label_dropped(
391                        env.full_name.as_str(),
392                        k,
393                        v.len() as u64,
394                    );
395                    return false;
396                }
397            }
398        }
399        // Step 3a: per-emit dynamic filter directive (`[field=value]=level`).
400        // Spec 13 § 7.1 / spec 93 P0-5. Skipped only for sampler bypasses
401        // — those decisions live in `sample_decide`, which honours
402        // `SamplingReason::Forensic / Audit / Override`.
403        let filter = self.filter.load();
404        if !filter.event_allowed(env, sev) {
405            return false;
406        }
407        // Step 4: head sampler. Spec 13 § 6 / spec 93 P2-7: forensic /
408        // audit / override emits bypass the sampler unconditionally.
409        let bypass_sampler = matches!(
410            env.sampling_reason,
411            ::buffa::EnumValue::Known(
412                PSamplingReason::SAMPLING_REASON_FORENSIC
413                    | PSamplingReason::SAMPLING_REASON_AUDIT
414                    | PSamplingReason::SAMPLING_REASON_OVERRIDE,
415            )
416        );
417        if bypass_sampler {
418            return true;
419        }
420        let cfg = self.config.load();
421        let inbound = inbound_traceparent_sampled();
422        match sample_decide(&cfg.sampling, env.full_name.as_str(), sev, inbound) {
423            SamplingDecision::Drop => {
424                return false;
425            }
426            SamplingDecision::Keep => {}
427            SamplingDecision::ParentSet { sampled: true } => {
428                env.sampling_reason =
429                    ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_OVERRIDE);
430            }
431            SamplingDecision::ParentSet { sampled: false } => {
432                return false;
433            }
434        }
435        // Step 5: tail-on-error push (TRACE/DEBUG only).
436        if matches!(
437            sev,
438            obs_proto::obs::v1::Severity::Trace | obs_proto::obs::v1::Severity::Debug
439        ) {
440            push_tail_buffer(env);
441        } else if sev >= obs_proto::obs::v1::Severity::Error {
442            crate::scope::mark_error_on_active_scopes();
443        }
444        true
445    }
446}
447
448impl Observer for StandardObserver {
449    fn emit_envelope(&self, mut env: ObsEnvelope) {
450        self.fill_identity(&mut env);
451        let sev = match env.sev {
452            ::buffa::EnumValue::Known(s) => s,
453            ::buffa::EnumValue::Unknown(_) => obs_proto::obs::v1::Severity::Unspecified,
454        };
455        if !self.run_emit_pipeline(&mut env, sev) {
456            return;
457        }
458        let tier = match env.tier {
459            ::buffa::EnumValue::Known(t) => t,
460            ::buffa::EnumValue::Unknown(_) => Tier::Unspecified,
461        };
462        if let Ok(_h) = tokio::runtime::Handle::try_current() {
463            self.dispatch_async(env, tier);
464        } else {
465            self.dispatch_sync(env, tier);
466        }
467    }
468
469    fn enabled(&self, callsite: &ObsCallsite) -> bool {
470        // Spec 13 § 7 / spec 94 § 2.2: defer the entire decision to
471        // `Filter::callsite_interest`. The earlier `||` shortcut against
472        // the default-level floor silently bypassed `=off` directives
473        // when a callsite's static severity satisfied the global floor.
474        // The filter already handles bare-level / target=level / =off /
475        // dynamic precedence in one place.
476        let filter = self.filter.load();
477        filter.callsite_interest(callsite) != crate::callsite::Interest::Never
478    }
479
480    fn generation(&self) -> u32 {
481        self.generation.load(Ordering::Acquire)
482    }
483
484    fn reload_filter(&self) {
485        self.generation.fetch_add(1, Ordering::Release);
486    }
487
488    fn flush(&self) -> SinkFut<'_> {
489        Box::pin(async move {
490            for w in [
491                self.workers.log.as_ref(),
492                self.workers.metric.as_ref(),
493                self.workers.trace.as_ref(),
494                self.workers.audit.as_ref(),
495            ]
496            .iter()
497            .flatten()
498            {
499                w.flush().await;
500            }
501        })
502    }
503
504    fn shutdown(&self) -> SinkFut<'_> {
505        Box::pin(async move {
506            for w in [
507                self.workers.log.as_ref(),
508                self.workers.metric.as_ref(),
509                self.workers.trace.as_ref(),
510                self.workers.audit.as_ref(),
511            ]
512            .iter()
513            .flatten()
514            {
515                w.shutdown().await;
516            }
517            if let Some(spool) = self.spool.as_ref() {
518                spool.close();
519            }
520        })
521    }
522
523    fn shutdown_blocking(&self, timeout: std::time::Duration) {
524        // Caller contexts the panic-hook + drain helpers must work in:
525        // (1) outside any tokio runtime → spin up a single-thread one
526        //     inline and run the async shutdown to completion;
527        // (2) on a tokio worker thread → use `block_in_place` so we
528        //     can `Handle::block_on` without dead-locking;
529        // (3) on a current-thread runtime → cannot block the active
530        //     executor reentrantly. We log a one-shot warning and
531        //     fall through; callers in this context should `await`
532        //     `Observer::shutdown()` directly. Spec 93 P2 follow-up.
533        match tokio::runtime::Handle::try_current() {
534            Err(_) => {
535                if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
536                    .enable_all()
537                    .build()
538                {
539                    let _ = rt.block_on(tokio::time::timeout(timeout, self.shutdown()));
540                }
541            }
542            Ok(handle) => {
543                if matches!(
544                    handle.runtime_flavor(),
545                    tokio::runtime::RuntimeFlavor::MultiThread
546                ) {
547                    tokio::task::block_in_place(|| {
548                        let _ = handle.block_on(tokio::time::timeout(timeout, self.shutdown()));
549                    });
550                } else {
551                    // Case 3: current-thread runtime — the only safe
552                    // option is to refuse silently rather than panic.
553                    // Operators in this context are expected to call
554                    // the async `Observer::shutdown()` themselves.
555                    eprintln!(
556                        "obs: shutdown_blocking called from a current-thread tokio runtime; use \
557                         `Observer::shutdown().await` instead"
558                    );
559                }
560            }
561        }
562    }
563
564    fn callsites(&self) -> Option<Arc<ObsCallsiteRegistry>> {
565        Some(Arc::clone(&self.callsites))
566    }
567
568    fn schema_registry(&self) -> Option<Arc<SchemaRegistry>> {
569        Some(Arc::clone(&self.registry))
570    }
571
572    fn resource_attrs(&self) -> Arc<ResourceAttrs> {
573        self.resource.load_full()
574    }
575}
576
577/// Builder for [`StandardObserver`].
578pub struct StandardObserverBuilder {
579    router: SinkRouter,
580    registry: Option<Arc<SchemaRegistry>>,
581    config: Option<EventsConfig>,
582    filter_spec: Option<String>,
583    service: Option<String>,
584    instance: Option<String>,
585    version: Option<String>,
586    spawn_workers: bool,
587}
588
589impl Default for StandardObserverBuilder {
590    fn default() -> Self {
591        Self {
592            router: SinkRouter::default(),
593            registry: None,
594            config: None,
595            filter_spec: None,
596            service: None,
597            instance: None,
598            version: None,
599            spawn_workers: true,
600        }
601    }
602}
603
604impl std::fmt::Debug for StandardObserverBuilder {
605    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606        f.debug_struct("StandardObserverBuilder")
607            .field("service", &self.service)
608            .field("version", &self.version)
609            .field("spawn_workers", &self.spawn_workers)
610            .finish_non_exhaustive()
611    }
612}
613
614impl StandardObserverBuilder {
615    /// Set service identity.
616    #[must_use]
617    pub fn service(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
618        self.service = Some(name.into());
619        self.version = Some(version.into());
620        self
621    }
622
623    /// Set instance id.
624    #[must_use]
625    pub fn instance(mut self, instance: impl Into<String>) -> Self {
626        self.instance = Some(instance.into());
627        self
628    }
629
630    /// Wire a sink for a specific tier. Calling twice replaces the
631    /// prior sink.
632    #[must_use]
633    pub fn sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
634        match tier {
635            Tier::Log => self.router.log = Some(sink),
636            Tier::Metric => self.router.metric = Some(sink),
637            Tier::Trace => self.router.trace = Some(sink),
638            Tier::Audit => self.router.audit = Some(sink),
639            _ => {}
640        }
641        self
642    }
643
644    /// Wire a fallback sink.
645    #[must_use]
646    pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
647        self.router.fallback = Some(sink);
648        self
649    }
650
651    /// Set an explicit config.
652    #[must_use]
653    pub fn config(mut self, cfg: EventsConfig) -> Self {
654        self.config = Some(cfg);
655        self
656    }
657
658    /// Set the filter spec (overrides anything in `config.filter`).
659    #[must_use]
660    pub fn filter(mut self, spec: impl Into<String>) -> Self {
661        self.filter_spec = Some(spec.into());
662        self
663    }
664
665    /// Use a specific schema registry.
666    #[must_use]
667    pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
668        self.registry = Some(registry);
669        self
670    }
671
672    /// Spawn per-tier mpsc workers when a tokio runtime is available
673    /// (default `true`). Disable for synchronous tests that want
674    /// in-emit-thread delivery.
675    #[must_use]
676    pub fn spawn_workers(mut self, yes: bool) -> Self {
677        self.spawn_workers = yes;
678        self
679    }
680
681    /// Finalise.
682    ///
683    /// # Errors
684    ///
685    /// Returns `BuildError` when config validation or filter parsing
686    /// fails.
687    pub fn build(self) -> Result<StandardObserver, BuildError> {
688        let mut cfg = self.config.unwrap_or_default();
689        // Spec 13 § 2.3 / 60 § 7 / spec 94 § 3.10: `OBS_DEV=1` forces
690        // dev-mode on regardless of what `obs.yaml` says. Recognised
691        // values are `1` / `true` / `yes`.
692        if !cfg.dev_mode
693            && let Ok(v) = std::env::var("OBS_DEV")
694        {
695            let on = matches!(
696                v.trim().to_ascii_lowercase().as_str(),
697                "1" | "true" | "yes" | "on"
698            );
699            cfg.dev_mode = on;
700        }
701        cfg.validate().map_err(BuildError::InvalidConfig)?;
702
703        let filter_spec = self
704            .filter_spec
705            .or_else(|| cfg.filter.clone())
706            .or_else(|| std::env::var("OBS_FILTER").ok());
707        let filter = match filter_spec.as_deref() {
708            Some(spec) => Filter::parse(spec).map_err(|e| {
709                BuildError::InvalidConfig(crate::config::ConfigError::invalid_range(
710                    "filter",
711                    format!("{e}"),
712                ))
713            })?,
714            None => Filter::new(),
715        };
716
717        let registry = self
718            .registry
719            .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
720
721        // Service defaults from env.
722        let service = self
723            .service
724            .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
725            .unwrap_or_else(|| "obs".to_string());
726        let version = self
727            .version
728            .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
729        let instance = self.instance.unwrap_or_default();
730
731        let counters = Arc::new(WorkerCounters::default());
732        let spool = if self.router.audit.is_some() {
733            Some(Arc::new(
734                SpoolWriter::open_with_fsync(
735                    cfg.audit.spool_dir.clone(),
736                    cfg.audit.spool_max_bytes,
737                    cfg.audit.on_failure,
738                    cfg.audit.fsync_mode,
739                )
740                .map_err(BuildError::SpoolOpen)?,
741            ))
742        } else {
743            None
744        };
745        let workers = if self.spawn_workers {
746            spawn_pool(&self.router, &registry, &counters, &cfg.queues)
747        } else {
748            WorkerPool::default()
749        };
750
751        let resource = build_resource_from_env(&service, &version, &instance);
752        let observer = StandardObserver {
753            router: self.router,
754            workers,
755            spool,
756            registry,
757            callsites: Arc::new(ObsCallsiteRegistry::new()),
758            config: ArcSwap::from_pointee(cfg),
759            filter: ArcSwap::from_pointee(filter),
760            resource: ArcSwap::from_pointee(resource),
761            counters,
762            generation: AtomicU32::new(1),
763            service,
764            instance,
765            version,
766            sync_dispatch_lock: Mutex::new(()),
767        };
768        // Spec 11 § 6.4: at observer init, drain any `*.audit.bin`
769        // files left over from a prior process. Each recovered record
770        // is enqueued onto the AUDIT worker; one
771        // `ObsAuditSpoolRecovered` self-event is emitted with the total
772        // count.
773        observer.recover_audit_spool();
774        // Spec 11 § 10 / spec 93 P1-2: announce the registry size so
775        // operators can correlate sink behaviour with the schema set
776        // currently linked into the binary.
777        let schema_count = observer.registry.len() as u64;
778        crate::self_events::emit_registry_initialized(schema_count, 0);
779        Ok(observer)
780    }
781}
782
783/// Populate a [`ResourceAttrs`] from `OTEL_SERVICE_NAME` /
784/// `OTEL_RESOURCE_ATTRIBUTES` plus the observer's own service /
785/// version / instance identity. Spec 20 § 2.1 / spec 94 § 2.7.
786fn build_resource_from_env(service: &str, version: &str, instance: &str) -> ResourceAttrs {
787    let mut r = ResourceAttrs {
788        service_name: service.to_string(),
789        service_version: version.to_string(),
790        service_instance_id: instance.to_string(),
791        ..Default::default()
792    };
793    if let Ok(name) = std::env::var("OTEL_SERVICE_NAME")
794        && !name.is_empty()
795    {
796        r.service_name = name;
797    }
798    if let Ok(extras) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") {
799        for pair in extras.split(',') {
800            let pair = pair.trim();
801            if pair.is_empty() {
802                continue;
803            }
804            if let Some((k, v)) = pair.split_once('=') {
805                let key = k.trim();
806                let val = v.trim().to_string();
807                match key {
808                    "service.name" => r.service_name = val,
809                    "service.version" => r.service_version = val,
810                    "service.namespace" => r.service_namespace = val,
811                    "service.instance.id" => r.service_instance_id = val,
812                    "deployment.environment" => r.deployment_environment = val,
813                    "host.name" => r.host_name = val,
814                    "host.arch" => r.host_arch = val,
815                    _ => {
816                        r.extra.insert(key.to_string(), val);
817                    }
818                }
819            }
820        }
821    }
822    if r.host_arch.is_empty() {
823        r.host_arch = match std::env::consts::ARCH {
824            "x86_64" => "amd64".to_string(),
825            "aarch64" => "arm64".to_string(),
826            other => other.to_string(),
827        };
828    }
829    if r.host_name.is_empty()
830        && let Ok(host) = std::env::var("HOSTNAME")
831    {
832        r.host_name = host;
833    }
834    r
835}
836
837fn config_hash(cfg: &EventsConfig) -> u64 {
838    let bytes = match serde_yaml::to_string(cfg) {
839        Ok(s) => s.into_bytes(),
840        Err(_) => return 0,
841    };
842    let h = blake3::hash(&bytes);
843    let arr: [u8; 8] = match <[u8; 8]>::try_from(&h.as_bytes()[..8]) {
844        Ok(a) => a,
845        Err(_) => return 0,
846    };
847    u64::from_le_bytes(arr)
848}
849
850fn spawn_pool(
851    router: &SinkRouter,
852    registry: &Arc<SchemaRegistry>,
853    counters: &Arc<WorkerCounters>,
854    queues: &crate::config::QueuesConfig,
855) -> WorkerPool {
856    let mut pool = WorkerPool::default();
857    if let Some(sink) = router.log.as_ref().or(router.fallback.as_ref()) {
858        pool.log = spawn_tier_worker(
859            Tier::Log,
860            queues,
861            Arc::clone(sink),
862            Arc::clone(registry),
863            Arc::clone(counters),
864        );
865    }
866    if let Some(sink) = router.metric.as_ref().or(router.fallback.as_ref()) {
867        pool.metric = spawn_tier_worker(
868            Tier::Metric,
869            queues,
870            Arc::clone(sink),
871            Arc::clone(registry),
872            Arc::clone(counters),
873        );
874    }
875    if let Some(sink) = router.trace.as_ref().or(router.fallback.as_ref()) {
876        pool.trace = spawn_tier_worker(
877            Tier::Trace,
878            queues,
879            Arc::clone(sink),
880            Arc::clone(registry),
881            Arc::clone(counters),
882        );
883    }
884    if let Some(sink) = router.audit.as_ref() {
885        pool.audit = spawn_tier_worker(
886            Tier::Audit,
887            queues,
888            Arc::clone(sink),
889            Arc::clone(registry),
890            Arc::clone(counters),
891        );
892    }
893    pool
894}
895
896/// Errors returned by [`StandardObserverBuilder::build`].
897#[derive(Debug, thiserror::Error)]
898#[non_exhaustive]
899pub enum BuildError {
900    /// Config validation failed.
901    #[error("invalid config: {0}")]
902    InvalidConfig(#[from] crate::config::ConfigError),
903    /// AUDIT spool could not be opened.
904    #[error("audit spool open failed: {0}")]
905    SpoolOpen(#[source] std::io::Error),
906}
907
908#[allow(dead_code)]
909fn _ensure_noop_compiles() {
910    let _: Arc<dyn Sink> = Arc::new(NoopSink);
911}
912
913#[cfg(test)]
914mod tests {
915    use super::*;
916    use crate::resource::ResourceAttrs;
917
918    #[test]
919    fn test_oversized_label_value_drops_envelope() {
920        // Spec 94 § 3.5 / P2-E: a label value over
921        // `limits.max_label_value_bytes` causes the envelope to be
922        // dropped via `ObsOversizedDropped { reason = "label" }`.
923        use obs_proto::obs::v1::{
924            ObsEnvelope, SamplingReason as PSamplingReason, Severity as PSev, Tier as PTier,
925        };
926        let observer = StandardObserver::builder()
927            .service("test", "1.0.0")
928            .sink_fallback(Arc::new(NoopSink))
929            .spawn_workers(false)
930            .build()
931            .expect("build");
932        // The default cap is 1 KiB; emit a label whose value is 2 KiB.
933        let mut env = ObsEnvelope {
934            full_name: "test.v1.ObsBig".to_string(),
935            tier: ::buffa::EnumValue::Known(PTier::TIER_LOG),
936            sev: ::buffa::EnumValue::Known(PSev::SEVERITY_INFO),
937            sampling_reason: ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_HEAD_RATE),
938            ..Default::default()
939        };
940        env.labels.insert("ua".to_string(), "x".repeat(2048));
941        let kept = observer.run_emit_pipeline(&mut env, obs_proto::obs::v1::Severity::Info);
942        assert!(!kept, "envelope with oversize label value must be dropped");
943    }
944
945    #[test]
946    fn test_set_resource_attrs_is_visible_to_observer_callers() {
947        // Spec 94 § 2.7 / P1-E: a single ArcSwap on the observer; a
948        // mutation must be visible to subsequent reads.
949        let observer = StandardObserver::builder()
950            .service("test", "1.0.0")
951            .sink_fallback(Arc::new(NoopSink))
952            .spawn_workers(false)
953            .build()
954            .expect("build");
955        let before = observer.resource_attrs();
956        assert_eq!(before.service_name, "test");
957        observer.set_resource_attrs(ResourceAttrs {
958            service_name: "rotated".to_string(),
959            deployment_environment: "prod".to_string(),
960            ..Default::default()
961        });
962        let after = observer.resource_attrs();
963        assert_eq!(after.service_name, "rotated");
964        assert_eq!(after.deployment_environment, "prod");
965    }
966}