Skip to main content

obs_core/observer/
standard.rs

1//! `StandardObserver` — production-ready observer with per-tier
2//! workers, AUDIT spool, head sampler, scope auto-fill, and live
3//! config reload. Spec 11 §§ 3, 4, 6.4 + spec 13 §§ 2, 6.
4
5use std::sync::{
6    Arc,
7    atomic::{AtomicU32, Ordering},
8};
9
10use arc_swap::ArcSwap;
11use bytes;
12use obs_proto::obs::v1::{ObsEnvelope, SamplingReason as PSamplingReason};
13use obs_types::Tier;
14use parking_lot::Mutex;
15
16use super::{
17    Observer,
18    workers::{TierWorker, WorkerCounters, note_channel_full, spawn_tier_worker},
19};
20use crate::{
21    audit_spool::SpoolWriter,
22    callsite::ObsCallsite,
23    config::{AuditFailureMode, EventsConfig},
24    filter::Filter,
25    registry::{ObsCallsiteRegistry, SchemaRegistry, ScrubbedEnvelope},
26    resource::ResourceAttrs,
27    sampling::{SamplingDecision, decide as sample_decide},
28    scope::{auto_fill_envelope, inbound_traceparent_sampled, push_tail_buffer},
29    sink::{NoopSink, Sink, SinkFut, StdoutSink},
30};
31
32/// Tier-matching dispatcher. One sink slot per tier plus a fallback.
33#[derive(Default)]
34struct SinkRouter {
35    log: Option<Arc<dyn Sink>>,
36    metric: Option<Arc<dyn Sink>>,
37    trace: Option<Arc<dyn Sink>>,
38    audit: Option<Arc<dyn Sink>>,
39    fallback: Option<Arc<dyn Sink>>,
40}
41
42impl std::fmt::Debug for SinkRouter {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("SinkRouter")
45            .field("log", &self.log.as_ref().map(|_| "..."))
46            .field("metric", &self.metric.as_ref().map(|_| "..."))
47            .field("trace", &self.trace.as_ref().map(|_| "..."))
48            .field("audit", &self.audit.as_ref().map(|_| "..."))
49            .field("fallback", &self.fallback.as_ref().map(|_| "..."))
50            .finish()
51    }
52}
53
54impl SinkRouter {
55    fn for_tier(&self, tier: Tier) -> Option<&Arc<dyn Sink>> {
56        let primary = match tier {
57            Tier::Log => self.log.as_ref(),
58            Tier::Metric => self.metric.as_ref(),
59            Tier::Trace => self.trace.as_ref(),
60            Tier::Audit => self.audit.as_ref(),
61            _ => None,
62        };
63        primary.or(self.fallback.as_ref())
64    }
65}
66
67/// Worker handles, indexed by tier; AUDIT is special (spool path).
68#[derive(Debug, Default)]
69struct WorkerPool {
70    log: Option<TierWorker>,
71    metric: Option<TierWorker>,
72    trace: Option<TierWorker>,
73    audit: Option<TierWorker>,
74}
75
76/// Production-ready observer with reloadable config and per-tier
77/// worker pool.
78pub struct StandardObserver {
79    router: SinkRouter,
80    workers: WorkerPool,
81    spool: Option<Arc<SpoolWriter>>,
82    registry: Arc<SchemaRegistry>,
83    callsites: Arc<ObsCallsiteRegistry>,
84    config: ArcSwap<EventsConfig>,
85    filter: ArcSwap<Filter>,
86    /// Workspace-shared OTel resource attribute set; sinks read the
87    /// snapshot at flush time. Spec 20 § 2.1 / spec 94 § 2.7 / P1-E.
88    resource: ArcSwap<ResourceAttrs>,
89    counters: Arc<WorkerCounters>,
90    generation: AtomicU32,
91    service: String,
92    instance: String,
93    version: String,
94    /// Synchronous fallback for environments without a tokio runtime
95    /// (tests, CLI tools): protects in-thread sink dispatch.
96    sync_dispatch_lock: Mutex<()>,
97}
98
99impl std::fmt::Debug for StandardObserver {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct("StandardObserver")
102            .field("schemas", &self.registry.len())
103            .field("service", &self.service)
104            .field("instance", &self.instance)
105            .field("version", &self.version)
106            .field("generation", &self.generation.load(Ordering::Relaxed))
107            .finish_non_exhaustive()
108    }
109}
110
111impl StandardObserver {
112    /// Builder entry.
113    #[must_use]
114    pub fn builder() -> StandardObserverBuilder {
115        StandardObserverBuilder::default()
116    }
117
118    /// Convenience: `StandardObserver` with `StdoutSink(Full)` as
119    /// fallback.
120    ///
121    /// # Errors
122    ///
123    /// Returns `BuildError` if config validation fails.
124    pub fn dev() -> Result<Self, BuildError> {
125        Self::builder()
126            .service("dev", env!("CARGO_PKG_VERSION"))
127            .sink_fallback(Arc::new(StdoutSink::default()))
128            .build()
129    }
130
131    /// Read-only access to the registry (used by sinks).
132    #[must_use]
133    pub fn registry(&self) -> Arc<SchemaRegistry> {
134        Arc::clone(&self.registry)
135    }
136
137    /// Read-only access to the per-process callsite registry. Used by
138    /// the bridge (Direction A inserts; Direction B reads to
139    /// reconstitute `tracing::Metadata`). Spec 31 § 3.2.
140    #[must_use]
141    pub fn callsites(&self) -> Arc<ObsCallsiteRegistry> {
142        Arc::clone(&self.callsites)
143    }
144
145    /// Read-only access to the live config.
146    #[must_use]
147    pub fn config(&self) -> arc_swap::Guard<Arc<EventsConfig>> {
148        self.config.load()
149    }
150
151    /// Atomically swap the resource attribute set. Sinks pick up the
152    /// new value on the next [`Observer::resource_attrs`] call.
153    /// Spec 20 § 2.1 / spec 94 § 2.7 / P1-E.
154    pub fn set_resource_attrs(&self, attrs: ResourceAttrs) {
155        self.resource.store(Arc::new(attrs));
156    }
157
158    /// Atomically swap the config and bump the generation so all
159    /// callsites re-probe. Spec 11 § 3.2.
160    ///
161    /// # Errors
162    ///
163    /// Returns `BuildError::InvalidConfig` if validation rejects
164    /// `new_config`.
165    pub fn reload_config(&self, new_config: EventsConfig) -> Result<(), BuildError> {
166        if let Err(e) = new_config.validate() {
167            crate::self_events::emit_config_reload_failed(&format!("validate: {e}"));
168            return Err(BuildError::InvalidConfig(e));
169        }
170        if let Some(spec) = new_config.filter.as_deref() {
171            match Filter::parse(spec) {
172                Ok(parsed) => self.filter.store(Arc::new(parsed)),
173                Err(e) => {
174                    crate::self_events::emit_config_reload_failed(&format!("filter: {e}"));
175                    return Err(BuildError::InvalidConfig(
176                        crate::config::ConfigError::invalid_range("filter", format!("{e}")),
177                    ));
178                }
179            }
180        } else {
181            self.filter.store(Arc::new(Filter::new()));
182        }
183        // Cheap rolling hash so operators can correlate
184        // `ObsConfigReloaded` events with config-file generations
185        // without leaking field values into labels.
186        let cfg_hash = config_hash(&new_config);
187        self.config.store(Arc::new(new_config));
188        self.generation.fetch_add(1, Ordering::Release);
189        crate::self_events::emit_config_reloaded(cfg_hash);
190        Ok(())
191    }
192
193    /// Read-only access to the live filter.
194    #[must_use]
195    pub fn filter(&self) -> Arc<Filter> {
196        self.filter.load_full()
197    }
198
199    /// Worker counters surface for tests + diagnostics.
200    #[must_use]
201    pub fn counters(&self) -> Arc<WorkerCounters> {
202        Arc::clone(&self.counters)
203    }
204
205    fn fill_identity(&self, env: &mut ObsEnvelope) {
206        if env.service.is_empty() {
207            env.service.clone_from(&self.service);
208        }
209        if env.instance.is_empty() {
210            env.instance.clone_from(&self.instance);
211        }
212        if env.version.is_empty() {
213            env.version.clone_from(&self.version);
214        }
215    }
216
217    fn dispatch_sync(&self, env: ObsEnvelope, tier: Tier) {
218        // No tokio runtime ⇒ deliver in-emit-thread. The spec's
219        // "scrubber on the worker thread" rule (spec 11 § 4.1) still
220        // applies: sinks must never see an unscrubbed envelope, so we
221        // run the scrubber here and then dispatch.
222        let _g = self.sync_dispatch_lock.lock();
223        let Some(sink) = self.router.for_tier(tier) else {
224            return;
225        };
226        let mut scratch = bytes::BytesMut::with_capacity(env.payload.len());
227        let scrubbed = match ScrubbedEnvelope::scrub(&env, &self.registry, &mut scratch) {
228            Ok(s) => s,
229            Err(_) => return,
230        };
231        sink.deliver(scrubbed);
232    }
233
234    fn dispatch_async(&self, env: ObsEnvelope, tier: Tier) {
235        let worker = match tier {
236            Tier::Log => self.workers.log.as_ref(),
237            Tier::Metric => self.workers.metric.as_ref(),
238            Tier::Trace => self.workers.trace.as_ref(),
239            Tier::Audit => self.workers.audit.as_ref(),
240            _ => None,
241        };
242        let Some(worker) = worker else {
243            // No worker (no tokio runtime) — fall back to sync.
244            self.dispatch_sync(env, tier);
245            return;
246        };
247        if tier == Tier::Audit {
248            self.dispatch_audit(worker, env);
249        } else {
250            match worker.try_send(env) {
251                Ok(()) => {}
252                Err(_dropped) => {
253                    note_channel_full(&self.counters, tier);
254                }
255            }
256        }
257    }
258
259    fn dispatch_audit(&self, worker: &TierWorker, env: ObsEnvelope) {
260        let cfg = self.config.load();
261        let block_ms = u64::from(cfg.audit.block_ms_max);
262        // First try a non-blocking send; if it succeeds, we're done.
263        let mut env_unsent = match worker.try_send(env) {
264            Ok(()) => return,
265            Err(env) => env,
266        };
267        // Fall back to bounded busy-wait with the configured timeout.
268        // We deliberately do NOT call `Handle::block_on` here — that
269        // panics when the caller is already inside a runtime. Instead,
270        // poll `try_send` with a short sleep to honour the documented
271        // "bounded blocking" semantics (spec 11 § 6.4) without relying
272        // on `block_on`. The total wall-clock blocking is bounded by
273        // `audit.block_ms_max`.
274        let started = std::time::Instant::now();
275        let interval = std::time::Duration::from_millis(2);
276        while started.elapsed().as_millis() < u128::from(block_ms) {
277            match worker.try_send(env_unsent) {
278                Ok(()) => return,
279                Err(env) => env_unsent = env,
280            }
281            std::thread::sleep(interval);
282        }
283        // Channel still full ⇒ spool to disk.
284        if let Some(spool) = self.spool.as_ref() {
285            match spool.append(&env_unsent) {
286                Ok(()) => {
287                    note_channel_full(&self.counters, Tier::Audit);
288                    crate::self_events::emit_audit_spooled(env_unsent.full_name.as_str());
289                }
290                Err(e) => {
291                    crate::self_events::emit_audit_spool_failed(&e.to_string());
292                    self.handle_spool_failure();
293                }
294            }
295        } else {
296            crate::self_events::emit_audit_spool_failed("no spool configured");
297            self.handle_spool_failure();
298        }
299    }
300
301    fn handle_spool_failure(&self) {
302        // The AUDIT-tier failure path is documented as a deliberate
303        // policy escalation (spec 11 § 6.4); the choice between panic /
304        // abort / warn_only is taken from `audit.on_failure`. Allow
305        // `clippy::panic` here because the panic is the documented
306        // escape hatch when the operator picked `Panic` mode.
307        #[allow(clippy::panic)]
308        {
309            let cfg = self.config.load();
310            match cfg.audit.on_failure {
311                AuditFailureMode::Panic => {
312                    panic!("audit spool unwritable; compliance failure")
313                }
314                AuditFailureMode::Abort => std::process::abort(),
315                AuditFailureMode::WarnOnly => {
316                    eprintln!("[obs] AUDIT spool unwritable; envelope dropped (warn_only)");
317                }
318            }
319        }
320    }
321
322    /// Drain any `*.audit.bin` files left in `audit.spool_dir` by a
323    /// prior process. Recovered envelopes are dispatched through the
324    /// AUDIT worker (or the sync fallback if no runtime is alive). One
325    /// `ObsAuditSpoolRecovered` self-event is emitted at the end with
326    /// the total count. Spec 11 § 6.4.
327    fn recover_audit_spool(&self) {
328        let cfg = self.config.load();
329        let dir = cfg.audit.spool_dir.clone();
330        if !dir.exists() {
331            return;
332        }
333        let mut total: u64 = 0;
334        let report = crate::audit_spool::recover(&dir, |env| {
335            total += 1;
336            // Re-enqueue: sync if no worker, async otherwise.
337            if let Some(worker) = self.workers.audit.as_ref() {
338                let _ = worker.try_send(env);
339            } else {
340                self.dispatch_sync(env, Tier::Audit);
341            }
342            Ok(())
343        });
344        if total == 0 {
345            let _ = report;
346            return;
347        }
348        let mut env = ObsEnvelope {
349            full_name: "obs.runtime.v1.ObsAuditSpoolRecovered".to_string(),
350            tier: ::buffa::EnumValue::Known(obs_proto::obs::v1::Tier::TIER_LOG),
351            sev: ::buffa::EnumValue::Known(obs_proto::obs::v1::Severity::SEVERITY_INFO),
352            ..Default::default()
353        };
354        env.labels
355            .insert("record_count".to_string(), total.to_string());
356        // Route directly through this observer (the global may not be
357        // installed yet at builder-time).
358        self.fill_identity(&mut env);
359        self.dispatch_sync(env, Tier::Log);
360    }
361
362    /// Apply scope auto-fill, head sampling, and tail-buffer push to
363    /// `env`. Returns `true` when the envelope should continue down
364    /// the per-tier worker; `false` when it was dropped or counted as
365    /// a buffer push.
366    fn run_emit_pipeline(&self, env: &mut ObsEnvelope, sev: obs_types::Severity) -> bool {
367        // Step 3 (post-project): auto-fill from scope frame stack.
368        auto_fill_envelope(env);
369        // Step 3.0: enforce `limits.max_payload_bytes` (spec 11 § 6.2 /
370        // spec 93 P2-10). An oversized envelope is dropped and a
371        // `ObsOversizedDropped` self-event records the drop with a
372        // `full_name` label so operators can find the noisy schema.
373        let cfg_pre = self.config.load();
374        let max_bytes = u64::from(cfg_pre.limits.max_payload_bytes);
375        let payload_size = env.payload.len() as u64;
376        if max_bytes > 0 && payload_size > max_bytes {
377            crate::self_events::emit_oversized_dropped(env.full_name.as_str(), payload_size);
378            return false;
379        }
380        // Step 3.1 (spec 11 § 6.2 / spec 94 § 3.5 / P2-E): enforce
381        // `limits.max_label_value_bytes`. A single oversize label value
382        // is treated as DoS-class input — User-Agent floods and
383        // attacker-controlled headers were the motivation. The
384        // envelope is dropped with `reason = "label"` and the offending
385        // label key is recorded in `size_bytes` so operators can grep
386        // by `(full_name, label_name, size)`.
387        let max_label_bytes = u64::from(cfg_pre.limits.max_label_value_bytes);
388        if max_label_bytes > 0 {
389            for (k, v) in &env.labels {
390                if v.len() as u64 > max_label_bytes {
391                    crate::self_events::emit_oversized_label_dropped(
392                        env.full_name.as_str(),
393                        k,
394                        v.len() as u64,
395                    );
396                    return false;
397                }
398            }
399        }
400        // Step 3a: per-emit dynamic filter directive (`[field=value]=level`).
401        // Spec 13 § 7.1 / spec 93 P0-5. Skipped only for sampler bypasses
402        // — those decisions live in `sample_decide`, which honours
403        // `SamplingReason::Forensic / Audit / Override`.
404        let filter = self.filter.load();
405        if !filter.event_allowed(env, sev) {
406            return false;
407        }
408        // Step 4: head sampler. Spec 13 § 6 / spec 93 P2-7: forensic /
409        // audit / override emits bypass the sampler unconditionally.
410        let bypass_sampler = matches!(
411            env.sampling_reason,
412            ::buffa::EnumValue::Known(
413                PSamplingReason::SAMPLING_REASON_FORENSIC
414                    | PSamplingReason::SAMPLING_REASON_AUDIT
415                    | PSamplingReason::SAMPLING_REASON_OVERRIDE,
416            )
417        );
418        if bypass_sampler {
419            return true;
420        }
421        let cfg = self.config.load();
422        let inbound = inbound_traceparent_sampled();
423        match sample_decide(&cfg.sampling, env.full_name.as_str(), sev, inbound) {
424            SamplingDecision::Drop => {
425                return false;
426            }
427            SamplingDecision::Keep => {}
428            SamplingDecision::ParentSet { sampled: true } => {
429                env.sampling_reason =
430                    ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_OVERRIDE);
431            }
432            SamplingDecision::ParentSet { sampled: false } => {
433                return false;
434            }
435        }
436        // Step 5: tail-on-error push (TRACE/DEBUG only).
437        if matches!(sev, obs_types::Severity::Trace | obs_types::Severity::Debug) {
438            push_tail_buffer(env);
439        } else if sev >= obs_types::Severity::Error {
440            crate::scope::mark_error_on_active_scopes();
441        }
442        true
443    }
444}
445
446impl Observer for StandardObserver {
447    fn emit_envelope(&self, mut env: ObsEnvelope) {
448        self.fill_identity(&mut env);
449        let sev = match env.sev {
450            ::buffa::EnumValue::Known(s) => proto_sev_to_native(s),
451            ::buffa::EnumValue::Unknown(_) => obs_types::Severity::Unspecified,
452        };
453        if !self.run_emit_pipeline(&mut env, sev) {
454            return;
455        }
456        let tier = match env.tier {
457            ::buffa::EnumValue::Known(t) => proto_tier_to_native(t),
458            ::buffa::EnumValue::Unknown(_) => Tier::Unspecified,
459        };
460        if let Ok(_h) = tokio::runtime::Handle::try_current() {
461            self.dispatch_async(env, tier);
462        } else {
463            self.dispatch_sync(env, tier);
464        }
465    }
466
467    fn enabled(&self, callsite: &ObsCallsite) -> bool {
468        // Spec 13 § 7 / spec 94 § 2.2: defer the entire decision to
469        // `Filter::callsite_interest`. The earlier `||` shortcut against
470        // the default-level floor silently bypassed `=off` directives
471        // when a callsite's static severity satisfied the global floor.
472        // The filter already handles bare-level / target=level / =off /
473        // dynamic precedence in one place.
474        let filter = self.filter.load();
475        filter.callsite_interest(callsite) != crate::callsite::Interest::Never
476    }
477
478    fn generation(&self) -> u32 {
479        self.generation.load(Ordering::Acquire)
480    }
481
482    fn reload_filter(&self) {
483        self.generation.fetch_add(1, Ordering::Release);
484    }
485
486    fn flush(&self) -> SinkFut<'_> {
487        Box::pin(async move {
488            for w in [
489                self.workers.log.as_ref(),
490                self.workers.metric.as_ref(),
491                self.workers.trace.as_ref(),
492                self.workers.audit.as_ref(),
493            ]
494            .iter()
495            .flatten()
496            {
497                w.flush().await;
498            }
499        })
500    }
501
502    fn shutdown(&self) -> SinkFut<'_> {
503        Box::pin(async move {
504            for w in [
505                self.workers.log.as_ref(),
506                self.workers.metric.as_ref(),
507                self.workers.trace.as_ref(),
508                self.workers.audit.as_ref(),
509            ]
510            .iter()
511            .flatten()
512            {
513                w.shutdown().await;
514            }
515            if let Some(spool) = self.spool.as_ref() {
516                spool.close();
517            }
518        })
519    }
520
521    fn shutdown_blocking(&self, timeout: std::time::Duration) {
522        // Caller contexts the panic-hook + drain helpers must work in:
523        // (1) outside any tokio runtime → spin up a single-thread one
524        //     inline and run the async shutdown to completion;
525        // (2) on a tokio worker thread → use `block_in_place` so we
526        //     can `Handle::block_on` without dead-locking;
527        // (3) on a current-thread runtime → cannot block the active
528        //     executor reentrantly. We log a one-shot warning and
529        //     fall through; callers in this context should `await`
530        //     `Observer::shutdown()` directly. Spec 93 P2 follow-up.
531        match tokio::runtime::Handle::try_current() {
532            Err(_) => {
533                if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
534                    .enable_all()
535                    .build()
536                {
537                    let _ = rt.block_on(tokio::time::timeout(timeout, self.shutdown()));
538                }
539            }
540            Ok(handle) => {
541                if matches!(
542                    handle.runtime_flavor(),
543                    tokio::runtime::RuntimeFlavor::MultiThread
544                ) {
545                    tokio::task::block_in_place(|| {
546                        let _ = handle.block_on(tokio::time::timeout(timeout, self.shutdown()));
547                    });
548                } else {
549                    // Case 3: current-thread runtime — the only safe
550                    // option is to refuse silently rather than panic.
551                    // Operators in this context are expected to call
552                    // the async `Observer::shutdown()` themselves.
553                    eprintln!(
554                        "obs: shutdown_blocking called from a current-thread tokio runtime; use \
555                         `Observer::shutdown().await` instead"
556                    );
557                }
558            }
559        }
560    }
561
562    fn callsites(&self) -> Option<Arc<ObsCallsiteRegistry>> {
563        Some(Arc::clone(&self.callsites))
564    }
565
566    fn schema_registry(&self) -> Option<Arc<SchemaRegistry>> {
567        Some(Arc::clone(&self.registry))
568    }
569
570    fn resource_attrs(&self) -> Arc<ResourceAttrs> {
571        self.resource.load_full()
572    }
573}
574
575#[allow(non_snake_case, non_upper_case_globals)]
576fn proto_tier_to_native(t: obs_proto::obs::v1::Tier) -> Tier {
577    use obs_proto::obs::v1::Tier as P;
578    match t {
579        P::TIER_UNSPECIFIED => Tier::Unspecified,
580        P::TIER_LOG => Tier::Log,
581        P::TIER_METRIC => Tier::Metric,
582        P::TIER_TRACE => Tier::Trace,
583        P::TIER_AUDIT => Tier::Audit,
584    }
585}
586
587#[allow(non_snake_case, non_upper_case_globals)]
588fn proto_sev_to_native(s: obs_proto::obs::v1::Severity) -> obs_types::Severity {
589    use obs_proto::obs::v1::Severity as P;
590    match s {
591        P::SEVERITY_UNSPECIFIED => obs_types::Severity::Unspecified,
592        P::SEVERITY_TRACE => obs_types::Severity::Trace,
593        P::SEVERITY_DEBUG => obs_types::Severity::Debug,
594        P::SEVERITY_INFO => obs_types::Severity::Info,
595        P::SEVERITY_WARN => obs_types::Severity::Warn,
596        P::SEVERITY_ERROR => obs_types::Severity::Error,
597        P::SEVERITY_FATAL => obs_types::Severity::Fatal,
598    }
599}
600
601/// Builder for [`StandardObserver`].
602pub struct StandardObserverBuilder {
603    router: SinkRouter,
604    registry: Option<Arc<SchemaRegistry>>,
605    config: Option<EventsConfig>,
606    filter_spec: Option<String>,
607    service: Option<String>,
608    instance: Option<String>,
609    version: Option<String>,
610    spawn_workers: bool,
611}
612
613impl Default for StandardObserverBuilder {
614    fn default() -> Self {
615        Self {
616            router: SinkRouter::default(),
617            registry: None,
618            config: None,
619            filter_spec: None,
620            service: None,
621            instance: None,
622            version: None,
623            spawn_workers: true,
624        }
625    }
626}
627
628impl std::fmt::Debug for StandardObserverBuilder {
629    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
630        f.debug_struct("StandardObserverBuilder")
631            .field("service", &self.service)
632            .field("version", &self.version)
633            .field("spawn_workers", &self.spawn_workers)
634            .finish_non_exhaustive()
635    }
636}
637
638impl StandardObserverBuilder {
639    /// Set service identity.
640    #[must_use]
641    pub fn service(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
642        self.service = Some(name.into());
643        self.version = Some(version.into());
644        self
645    }
646
647    /// Set instance id.
648    #[must_use]
649    pub fn instance(mut self, instance: impl Into<String>) -> Self {
650        self.instance = Some(instance.into());
651        self
652    }
653
654    /// Wire a sink for a specific tier. Calling twice replaces the
655    /// prior sink.
656    #[must_use]
657    pub fn sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
658        match tier {
659            Tier::Log => self.router.log = Some(sink),
660            Tier::Metric => self.router.metric = Some(sink),
661            Tier::Trace => self.router.trace = Some(sink),
662            Tier::Audit => self.router.audit = Some(sink),
663            _ => {}
664        }
665        self
666    }
667
668    /// Wire a fallback sink.
669    #[must_use]
670    pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
671        self.router.fallback = Some(sink);
672        self
673    }
674
675    /// Set an explicit config.
676    #[must_use]
677    pub fn config(mut self, cfg: EventsConfig) -> Self {
678        self.config = Some(cfg);
679        self
680    }
681
682    /// Set the filter spec (overrides anything in `config.filter`).
683    #[must_use]
684    pub fn filter(mut self, spec: impl Into<String>) -> Self {
685        self.filter_spec = Some(spec.into());
686        self
687    }
688
689    /// Use a specific schema registry.
690    #[must_use]
691    pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
692        self.registry = Some(registry);
693        self
694    }
695
696    /// Spawn per-tier mpsc workers when a tokio runtime is available
697    /// (default `true`). Disable for synchronous tests that want
698    /// in-emit-thread delivery.
699    #[must_use]
700    pub fn spawn_workers(mut self, yes: bool) -> Self {
701        self.spawn_workers = yes;
702        self
703    }
704
705    /// Finalise.
706    ///
707    /// # Errors
708    ///
709    /// Returns `BuildError` when config validation or filter parsing
710    /// fails.
711    pub fn build(self) -> Result<StandardObserver, BuildError> {
712        let mut cfg = self.config.unwrap_or_default();
713        // Spec 13 § 2.3 / 60 § 7 / spec 94 § 3.10: `OBS_DEV=1` forces
714        // dev-mode on regardless of what `obs.yaml` says. Recognised
715        // values are `1` / `true` / `yes`.
716        if !cfg.dev_mode
717            && let Ok(v) = std::env::var("OBS_DEV")
718        {
719            let on = matches!(
720                v.trim().to_ascii_lowercase().as_str(),
721                "1" | "true" | "yes" | "on"
722            );
723            cfg.dev_mode = on;
724        }
725        cfg.validate().map_err(BuildError::InvalidConfig)?;
726
727        let filter_spec = self
728            .filter_spec
729            .or_else(|| cfg.filter.clone())
730            .or_else(|| std::env::var("OBS_FILTER").ok());
731        let filter = match filter_spec.as_deref() {
732            Some(spec) => Filter::parse(spec).map_err(|e| {
733                BuildError::InvalidConfig(crate::config::ConfigError::invalid_range(
734                    "filter",
735                    format!("{e}"),
736                ))
737            })?,
738            None => Filter::new(),
739        };
740
741        let registry = self
742            .registry
743            .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
744
745        // Service defaults from env.
746        let service = self
747            .service
748            .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
749            .unwrap_or_else(|| "obs".to_string());
750        let version = self
751            .version
752            .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
753        let instance = self.instance.unwrap_or_default();
754
755        let counters = Arc::new(WorkerCounters::default());
756        let spool = if self.router.audit.is_some() {
757            Some(Arc::new(
758                SpoolWriter::open_with_fsync(
759                    cfg.audit.spool_dir.clone(),
760                    cfg.audit.spool_max_bytes,
761                    cfg.audit.on_failure,
762                    cfg.audit.fsync_mode,
763                )
764                .map_err(BuildError::SpoolOpen)?,
765            ))
766        } else {
767            None
768        };
769        let workers = if self.spawn_workers {
770            spawn_pool(&self.router, &registry, &counters, &cfg.queues)
771        } else {
772            WorkerPool::default()
773        };
774
775        let resource = build_resource_from_env(&service, &version, &instance);
776        let observer = StandardObserver {
777            router: self.router,
778            workers,
779            spool,
780            registry,
781            callsites: Arc::new(ObsCallsiteRegistry::new()),
782            config: ArcSwap::from_pointee(cfg),
783            filter: ArcSwap::from_pointee(filter),
784            resource: ArcSwap::from_pointee(resource),
785            counters,
786            generation: AtomicU32::new(1),
787            service,
788            instance,
789            version,
790            sync_dispatch_lock: Mutex::new(()),
791        };
792        // Spec 11 § 6.4: at observer init, drain any `*.audit.bin`
793        // files left over from a prior process. Each recovered record
794        // is enqueued onto the AUDIT worker; one
795        // `ObsAuditSpoolRecovered` self-event is emitted with the total
796        // count.
797        observer.recover_audit_spool();
798        // Spec 11 § 10 / spec 93 P1-2: announce the registry size so
799        // operators can correlate sink behaviour with the schema set
800        // currently linked into the binary.
801        let schema_count = observer.registry.len() as u64;
802        crate::self_events::emit_registry_initialized(schema_count, 0);
803        Ok(observer)
804    }
805}
806
807/// Populate a [`ResourceAttrs`] from `OTEL_SERVICE_NAME` /
808/// `OTEL_RESOURCE_ATTRIBUTES` plus the observer's own service /
809/// version / instance identity. Spec 20 § 2.1 / spec 94 § 2.7.
810fn build_resource_from_env(service: &str, version: &str, instance: &str) -> ResourceAttrs {
811    let mut r = ResourceAttrs {
812        service_name: service.to_string(),
813        service_version: version.to_string(),
814        service_instance_id: instance.to_string(),
815        ..Default::default()
816    };
817    if let Ok(name) = std::env::var("OTEL_SERVICE_NAME")
818        && !name.is_empty()
819    {
820        r.service_name = name;
821    }
822    if let Ok(extras) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") {
823        for pair in extras.split(',') {
824            let pair = pair.trim();
825            if pair.is_empty() {
826                continue;
827            }
828            if let Some((k, v)) = pair.split_once('=') {
829                let key = k.trim();
830                let val = v.trim().to_string();
831                match key {
832                    "service.name" => r.service_name = val,
833                    "service.version" => r.service_version = val,
834                    "service.namespace" => r.service_namespace = val,
835                    "service.instance.id" => r.service_instance_id = val,
836                    "deployment.environment" => r.deployment_environment = val,
837                    "host.name" => r.host_name = val,
838                    "host.arch" => r.host_arch = val,
839                    _ => {
840                        r.extra.insert(key.to_string(), val);
841                    }
842                }
843            }
844        }
845    }
846    if r.host_arch.is_empty() {
847        r.host_arch = match std::env::consts::ARCH {
848            "x86_64" => "amd64".to_string(),
849            "aarch64" => "arm64".to_string(),
850            other => other.to_string(),
851        };
852    }
853    if r.host_name.is_empty()
854        && let Ok(host) = std::env::var("HOSTNAME")
855    {
856        r.host_name = host;
857    }
858    r
859}
860
861fn config_hash(cfg: &EventsConfig) -> u64 {
862    let bytes = match serde_yaml::to_string(cfg) {
863        Ok(s) => s.into_bytes(),
864        Err(_) => return 0,
865    };
866    let h = blake3::hash(&bytes);
867    let arr: [u8; 8] = match <[u8; 8]>::try_from(&h.as_bytes()[..8]) {
868        Ok(a) => a,
869        Err(_) => return 0,
870    };
871    u64::from_le_bytes(arr)
872}
873
874fn spawn_pool(
875    router: &SinkRouter,
876    registry: &Arc<SchemaRegistry>,
877    counters: &Arc<WorkerCounters>,
878    queues: &crate::config::QueuesConfig,
879) -> WorkerPool {
880    let mut pool = WorkerPool::default();
881    if let Some(sink) = router.log.as_ref().or(router.fallback.as_ref()) {
882        pool.log = spawn_tier_worker(
883            Tier::Log,
884            queues,
885            Arc::clone(sink),
886            Arc::clone(registry),
887            Arc::clone(counters),
888        );
889    }
890    if let Some(sink) = router.metric.as_ref().or(router.fallback.as_ref()) {
891        pool.metric = spawn_tier_worker(
892            Tier::Metric,
893            queues,
894            Arc::clone(sink),
895            Arc::clone(registry),
896            Arc::clone(counters),
897        );
898    }
899    if let Some(sink) = router.trace.as_ref().or(router.fallback.as_ref()) {
900        pool.trace = spawn_tier_worker(
901            Tier::Trace,
902            queues,
903            Arc::clone(sink),
904            Arc::clone(registry),
905            Arc::clone(counters),
906        );
907    }
908    if let Some(sink) = router.audit.as_ref() {
909        pool.audit = spawn_tier_worker(
910            Tier::Audit,
911            queues,
912            Arc::clone(sink),
913            Arc::clone(registry),
914            Arc::clone(counters),
915        );
916    }
917    pool
918}
919
920/// Errors returned by [`StandardObserverBuilder::build`].
921#[derive(Debug, thiserror::Error)]
922#[non_exhaustive]
923pub enum BuildError {
924    /// Config validation failed.
925    #[error("invalid config: {0}")]
926    InvalidConfig(#[from] crate::config::ConfigError),
927    /// AUDIT spool could not be opened.
928    #[error("audit spool open failed: {0}")]
929    SpoolOpen(#[source] std::io::Error),
930}
931
932#[allow(dead_code)]
933fn _ensure_noop_compiles() {
934    let _: Arc<dyn Sink> = Arc::new(NoopSink);
935}
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::resource::ResourceAttrs;
941
942    #[test]
943    fn test_oversized_label_value_drops_envelope() {
944        // Spec 94 § 3.5 / P2-E: a label value over
945        // `limits.max_label_value_bytes` causes the envelope to be
946        // dropped via `ObsOversizedDropped { reason = "label" }`.
947        use obs_proto::obs::v1::{
948            ObsEnvelope, SamplingReason as PSamplingReason, Severity as PSev, Tier as PTier,
949        };
950        let observer = StandardObserver::builder()
951            .service("test", "1.0.0")
952            .sink_fallback(Arc::new(NoopSink))
953            .spawn_workers(false)
954            .build()
955            .expect("build");
956        // The default cap is 1 KiB; emit a label whose value is 2 KiB.
957        let mut env = ObsEnvelope {
958            full_name: "test.v1.ObsBig".to_string(),
959            tier: ::buffa::EnumValue::Known(PTier::TIER_LOG),
960            sev: ::buffa::EnumValue::Known(PSev::SEVERITY_INFO),
961            sampling_reason: ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_HEAD_RATE),
962            ..Default::default()
963        };
964        env.labels.insert("ua".to_string(), "x".repeat(2048));
965        let kept = observer.run_emit_pipeline(&mut env, obs_types::Severity::Info);
966        assert!(!kept, "envelope with oversize label value must be dropped");
967    }
968
969    #[test]
970    fn test_set_resource_attrs_is_visible_to_observer_callers() {
971        // Spec 94 § 2.7 / P1-E: a single ArcSwap on the observer; a
972        // mutation must be visible to subsequent reads.
973        let observer = StandardObserver::builder()
974            .service("test", "1.0.0")
975            .sink_fallback(Arc::new(NoopSink))
976            .spawn_workers(false)
977            .build()
978            .expect("build");
979        let before = observer.resource_attrs();
980        assert_eq!(before.service_name, "test");
981        observer.set_resource_attrs(ResourceAttrs {
982            service_name: "rotated".to_string(),
983            deployment_environment: "prod".to_string(),
984            ..Default::default()
985        });
986        let after = observer.resource_attrs();
987        assert_eq!(after.service_name, "rotated");
988        assert_eq!(after.deployment_environment, "prod");
989    }
990}