Skip to main content

obs_core/observer/
mod.rs

1//! Three-tier observer resolution + the `Observer` trait.
2//!
3//! Spec 11 § 3:
4//!
5//! - Per-task `OBSERVER_TASK` (highest priority; via `Future::with_observer` adapter, lands in spec
6//!   13 § 3 / Phase 3 task 3.3).
7//! - Per-thread `OBSERVER_THREAD` — `with_observer_thread_local`, `with_test_observer`,
8//!   `#[obs::test]`.
9//! - Global `OBSERVER_GLOBAL` — `install_observer`.
10//!
11//! The hot path checks `OVERRIDE_COUNT == 0` first so a process that
12//! never installs an override pays one atomic load and one
13//! `ArcSwap::load_full`. The `CAN_ENTER` cell prevents re-entry when a
14//! sink synthesises an event.
15
16mod in_memory;
17mod noop;
18mod standard;
19pub(crate) mod workers;
20
21use std::{
22    cell::{Cell, RefCell},
23    sync::{
24        Arc, Weak,
25        atomic::{AtomicUsize, Ordering},
26    },
27};
28
29use arc_swap::ArcSwap;
30use obs_proto::obs::v1::ObsEnvelope;
31use once_cell::sync::Lazy;
32
33pub use self::{
34    in_memory::{InMemoryHandle, InMemoryObserver},
35    noop::NoopObserver,
36    standard::{BuildError, StandardObserver, StandardObserverBuilder},
37    workers::WorkerCounters,
38};
39use crate::callsite::ObsCallsite;
40
41/// The global observer trait. **Sealed** in spirit (downstream crates
42/// implement it freely, but the SDK ships `NoopObserver`,
43/// `InMemoryObserver`, `StandardObserver` covering 99% of cases).
44///
45/// `&self` everywhere so the same observer reference is reused across
46/// every emit; no locks taken on the hot path.
47pub trait Observer: Send + Sync + 'static {
48    /// Hot-path emit. Never blocks. Never panics. Spec 11 § 3.
49    fn emit_envelope(&self, env: ObsEnvelope);
50
51    /// Cheap callsite filter check; called only when the cached
52    /// `Interest` is `Sometimes`. Default impl returns `true`
53    /// (allows every callsite that survived `enabled_static`).
54    fn enabled(&self, callsite: &ObsCallsite) -> bool {
55        let _ = callsite;
56        true
57    }
58
59    /// Monotonic generation counter; bumped on every filter / config
60    /// change so callsite caches re-validate. Spec 11 § 3.2.
61    fn generation(&self) -> u32 {
62        0
63    }
64
65    /// Force every callsite's `interest` cache back to `Unknown`.
66    /// Default impl is a no-op for observers that don't filter.
67    fn reload_filter(&self) {}
68
69    /// Flush all sinks; await in-flight batches. Default no-op.
70    fn flush(&self) -> crate::sink::SinkFut<'_> {
71        Box::pin(async {})
72    }
73
74    /// Shutdown all sinks and join workers. Idempotent.
75    fn shutdown(&self) -> crate::sink::SinkFut<'_> {
76        Box::pin(async {})
77    }
78
79    /// Synchronous shutdown for use in panic hooks and `Drop` impls
80    /// where awaiting is not possible. Best-effort within `timeout`.
81    /// Spec 11 § 3, § 6.1.
82    fn shutdown_blocking(&self, timeout: std::time::Duration) {
83        let _ = timeout;
84    }
85
86    /// Access this observer's per-process callsite registry, when it
87    /// has one. The bridge (Direction A) writes the registry on first
88    /// sight; `ObsToTracingSink` reads it to reconstitute
89    /// `tracing::Metadata` for interned envelopes. Spec 31 § 3.2.
90    fn callsites(&self) -> Option<std::sync::Arc<crate::registry::ObsCallsiteRegistry>> {
91        None
92    }
93
94    /// Access this observer's schema registry, when it has one. Sinks
95    /// hold their own `Arc<SchemaRegistry>` from construction; this
96    /// hook lets the bridge fall back to the global observer's
97    /// registry without depending on `StandardObserver`.
98    fn schema_registry(&self) -> Option<std::sync::Arc<crate::registry::SchemaRegistry>> {
99        None
100    }
101
102    /// Snapshot of the workspace-shared `ResourceAttrs` (OTel
103    /// semantic-convention keys). Sinks call this at flush time so a
104    /// single config-watcher reload re-projects every sink. Default
105    /// returns the empty / observer-less attribute set; concrete
106    /// observers (`StandardObserver`) override. Spec 20 § 2.1 / spec
107    /// 94 § 2.7 / P1-E.
108    fn resource_attrs(&self) -> std::sync::Arc<crate::resource::ResourceAttrs> {
109        std::sync::Arc::new(crate::resource::ResourceAttrs::default())
110    }
111}
112
113// ─── Resolution slots (spec 11 § 3) ───────────────────────────────────
114
115/// Global observer slot. `ArcSwap<T>` requires `T: Sized` (arc_swap's
116/// `RefCnt` is sized-only), and `dyn Observer` is unsized, so we
117/// store `Arc<dyn Observer>` (a sized fat pointer) inside the
118/// `ArcSwap`. `load_full()` therefore returns
119/// `Arc<Arc<dyn Observer>>`; `observer()` derefs the outer `Arc` to
120/// hand back `Arc<dyn Observer>` directly. See
121/// `docs/research/spike-arcswap.md`.
122static OBSERVER_GLOBAL: Lazy<ArcSwap<Arc<dyn Observer>>> = Lazy::new(|| {
123    let initial: Arc<dyn Observer> = Arc::new(NoopObserver);
124    ArcSwap::from_pointee(initial)
125});
126
127thread_local! {
128    /// Per-thread override. `RefCell` so nested installs stack LIFO.
129    /// Mirrors tracing's `CURRENT_STATE.default`.
130    static OBSERVER_THREAD: RefCell<Option<Arc<dyn Observer>>> =
131        const { RefCell::new(None) };
132
133    /// Re-entry guard. Set to `false` while inside an `Observer::emit_envelope`
134    /// so that an inner emit (a sink synthesising an event) returns
135    /// `NoopObserver` and becomes a no-op.
136    static CAN_ENTER: Cell<bool> = const { Cell::new(true) };
137}
138
139tokio::task_local! {
140    /// Per-task override. Set by `Future::with_observer(o)` (the
141    /// `Instrumented<F>` adapter; lands in spec 13 § 3, Phase 3
142    /// task 3.3). Defining the task-local slot now lets observer()
143    /// probe it correctly even before the adapter exists, so
144    /// resolution is forward-compatible.
145    static OBSERVER_TASK: Arc<dyn Observer>;
146}
147
148/// Hot-path fast flag. `0` ⇒ no override has ever been installed in
149/// this process; the resolver skips both probes and goes straight to
150/// the global. Spec 11 § 3 / spec 99 D-3.
151static OVERRIDE_COUNT: AtomicUsize = AtomicUsize::new(0);
152
153/// Resolve the active observer.
154///
155/// Hot path: returns a cloned `Arc` (one atomic refcount bump) so the
156/// caller does not hold a `Guard` across `await`. With no overrides
157/// ever installed, this collapses to `OVERRIDE_COUNT == 0` test +
158/// `OBSERVER_GLOBAL.load_full()`. Spec 11 § 3.
159#[inline]
160#[must_use]
161pub fn observer() -> Arc<dyn Observer> {
162    if !CAN_ENTER.with(Cell::get) {
163        return noop_observer_arc();
164    }
165    if OVERRIDE_COUNT.load(Ordering::Relaxed) == 0 {
166        // load_full() returns Arc<Arc<dyn Observer>>; (*x).clone()
167        // extracts the inner Arc<dyn Observer> with one refcount bump.
168        let outer = OBSERVER_GLOBAL.load_full();
169        return (*outer).clone();
170    }
171    if let Ok(per_task) = OBSERVER_TASK.try_with(Clone::clone) {
172        return per_task;
173    }
174    if let Some(per_thread) = OBSERVER_THREAD.with(|c| c.borrow().clone()) {
175        return per_thread;
176    }
177    let outer = OBSERVER_GLOBAL.load_full();
178    (*outer).clone()
179}
180
181fn noop_observer_arc() -> Arc<dyn Observer> {
182    Arc::new(NoopObserver)
183}
184
185/// Install the global observer. Called once at process start.
186///
187/// The argument is a constructed observer (typically
188/// `StandardObserver::builder().build()?`); we wrap it in `Arc` and
189/// store it under `OBSERVER_GLOBAL`.
190pub fn install_observer<O: Observer>(o: O) {
191    let arc: Arc<dyn Observer> = Arc::new(o);
192    OBSERVER_GLOBAL.store(Arc::new(arc));
193}
194
195/// Install a pre-arc'd observer. Convenience for tests and patterns
196/// that already have an `Arc<dyn Observer>` (e.g. multi-tenant
197/// registries).
198pub fn install_observer_arc(o: Arc<dyn Observer>) {
199    OBSERVER_GLOBAL.store(Arc::new(o));
200}
201
202/// Dispatch one envelope through the observer with the re-entry
203/// guard held. Spec 11 § 3.1 "Re-entry and the CAN_ENTER cell".
204///
205/// All emit paths (`Emit::emit`, the `obs::emit!` macro, the
206/// `<EventBuilder>::emit` setter) route through this so a sink that
207/// synthesises a new envelope from inside `Observer::emit_envelope`
208/// sees `observer()` returning `NoopObserver` and the inner emit
209/// becomes a no-op.
210#[inline]
211pub fn enter_emit_envelope(observer: &Arc<dyn Observer>, env: ObsEnvelope) {
212    let was_in = CAN_ENTER.with(|c| c.replace(false));
213    if was_in {
214        observer.emit_envelope(env);
215    } else {
216        // Spec 11 § 10 / spec 93 P2-13: surface the re-entry drop as
217        // an `ObsSinkDropped{tier=*, reason="reentry"}` self-event so
218        // operators can spot a sink that recursively emits. The
219        // emit-on-emit path itself is still suppressed by the
220        // `was_in == false` branch above; we only fire the metric.
221        let tier = match env.tier {
222            ::buffa::EnumValue::Known(t) => match t {
223                obs_proto::obs::v1::Tier::TIER_LOG => "log",
224                obs_proto::obs::v1::Tier::TIER_METRIC => "metric",
225                obs_proto::obs::v1::Tier::TIER_TRACE => "trace",
226                obs_proto::obs::v1::Tier::TIER_AUDIT => "audit",
227                _ => "unspecified",
228            },
229            _ => "unknown",
230        };
231        crate::self_events::emit_sink_dropped(tier, "reentry");
232    }
233    CAN_ENTER.with(|c| c.set(was_in));
234}
235
236/// Weak handle for code that needs to refer to the observer without
237/// extending its lifetime — chiefly sinks that internally hold
238/// callbacks back into the observer (e.g. the future
239/// `ObsToTracingSink` re-emitting through the registered tracing
240/// dispatcher).
241#[derive(Clone)]
242pub struct WeakObserver(Weak<dyn Observer>);
243
244impl WeakObserver {
245    /// Upgrade to a strong reference. Returns `None` after
246    /// `shutdown()` has dropped the last strong reference.
247    #[must_use]
248    pub fn upgrade(&self) -> Option<Arc<dyn Observer>> {
249        self.0.upgrade()
250    }
251}
252
253impl std::fmt::Debug for WeakObserver {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        f.debug_struct("WeakObserver").finish_non_exhaustive()
256    }
257}
258
259/// Weak handle to whatever `observer()` would return right now.
260#[must_use]
261pub fn observer_weak() -> WeakObserver {
262    let strong = observer();
263    WeakObserver(Arc::downgrade(&strong))
264}
265
266/// Sync RAII guard. Sets the per-thread observer override; restores
267/// the previous value on drop. **Do not hold across `.await`** — see
268/// the warning in spec 11 § 3.1; use `Future::with_observer` (Phase 3
269/// task 3.3) for async.
270///
271/// Bumps `OVERRIDE_COUNT` on first install in this process.
272#[must_use = "the override is reverted on Drop; bind to a variable"]
273pub fn with_observer_thread_local(o: Arc<dyn Observer>) -> ThreadObserverGuard {
274    let prev = OBSERVER_THREAD.with(|c| c.borrow_mut().replace(o));
275    OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
276    ThreadObserverGuard { prev }
277}
278
279/// RAII guard returned by [`with_observer_thread_local`].
280pub struct ThreadObserverGuard {
281    prev: Option<Arc<dyn Observer>>,
282}
283
284impl std::fmt::Debug for ThreadObserverGuard {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("ThreadObserverGuard")
287            .field("had_prev", &self.prev.is_some())
288            .finish()
289    }
290}
291
292impl Drop for ThreadObserverGuard {
293    fn drop(&mut self) {
294        OBSERVER_THREAD.with(|c| {
295            *c.borrow_mut() = self.prev.take();
296        });
297        OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
298    }
299}
300
301/// Test-flavored helper. Installs the observer per-thread for the
302/// duration of `f`; restores on closure exit. Used by `#[obs::test]`
303/// (Phase 2 task 2.6) and by users that want to assert what was
304/// emitted from a synchronous block.
305///
306/// Takes `Arc<dyn Observer>` so the caller can keep a clone and
307/// inspect it after the closure (e.g. `InMemoryObserver::handle()`).
308/// Spec 11 § 3.1.
309///
310/// Nested calls stack LIFO via the per-thread `RefCell` slot.
311pub fn with_test_observer<F, R>(observer: Arc<dyn Observer>, f: F) -> R
312where
313    F: FnOnce() -> R,
314{
315    let _g = with_observer_thread_local(observer);
316    f()
317}
318
319/// Async sibling of [`with_test_observer`]: install the observer in
320/// the per-task slot for the duration of `fut`. This is the version
321/// the `#[obs::test]` async expansion uses so that an emit on a
322/// migrated-tokio-worker-thread still resolves to the test observer
323/// (the per-thread slot would not be set on the destination worker).
324///
325/// `OVERRIDE_COUNT` is bumped before entering the scope and
326/// decremented after, so the hot-path resolver actually probes the
327/// task-local. Spec 11 § 3.1, KD-D3.
328pub async fn with_observer_task<F, R>(observer: Arc<dyn Observer>, fut: F) -> R
329where
330    F: std::future::Future<Output = R>,
331{
332    OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
333    let result = OBSERVER_TASK.scope(observer, fut).await;
334    OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
335    result
336}
337
338/// Synchronous sibling of [`with_observer_task`] used by
339/// `Instrumented<F>::poll` so a single poll can bind / unbind the
340/// per-task observer without requiring an `await`. Spec 13 § 3.
341pub fn with_observer_task_sync<F, R>(observer: Arc<dyn Observer>, f: F) -> R
342where
343    F: FnOnce() -> R,
344{
345    OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
346    let result = OBSERVER_TASK.sync_scope(observer, f);
347    OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
348    result
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use crate::observer::in_memory::InMemoryObserver;
355
356    #[test]
357    fn test_should_default_to_noop() {
358        // Default installation is `NoopObserver`. We can't easily
359        // identify it by type-id, but we can confirm `observer()`
360        // returns successfully.
361        let o = observer();
362        assert!(Arc::strong_count(&o) >= 1);
363    }
364
365    #[test]
366    fn test_with_test_observer_should_capture() {
367        let observer = InMemoryObserver::new();
368        let handle = observer.handle();
369        let observer: Arc<dyn Observer> = Arc::new(observer);
370        with_test_observer(observer, || {
371            // The thread-local is now set; observer() returns the
372            // InMemoryObserver, which is wired to its own InMemorySink.
373        });
374        // The InMemoryObserver in this Phase-1 shell does not fire
375        // sinks unless the user calls emit_envelope manually; this
376        // test is about resolution, not the full pipeline.
377        let _ = handle;
378    }
379}