obs_core/observer/mod.rs
1//! Three-tier observer resolution + the `Observer` trait.
2//!
3//! Spec 11 § 3:
4//!
5//! - Per-task `OBSERVER_TASK` (highest priority; via `Future::with_observer` adapter, lands in spec
6//! 13 § 3 / Phase 3 task 3.3).
7//! - Per-thread `OBSERVER_THREAD` — `with_observer_thread_local`, `with_test_observer`,
8//! `#[obs::test]`.
9//! - Global `OBSERVER_GLOBAL` — `install_observer`.
10//!
11//! The hot path checks `OVERRIDE_COUNT == 0` first so a process that
12//! never installs an override pays one atomic load and one
13//! `ArcSwap::load_full`. The `CAN_ENTER` cell prevents re-entry when a
14//! sink synthesises an event.
15
16mod in_memory;
17mod noop;
18mod standard;
19pub(crate) mod workers;
20
21use std::{
22 cell::{Cell, RefCell},
23 sync::{
24 Arc, Weak,
25 atomic::{AtomicUsize, Ordering},
26 },
27};
28
29use arc_swap::ArcSwap;
30use obs_proto::obs::v1::ObsEnvelope;
31use once_cell::sync::Lazy;
32
33pub use self::{
34 in_memory::{InMemoryHandle, InMemoryObserver},
35 noop::NoopObserver,
36 standard::{BuildError, StandardObserver, StandardObserverBuilder},
37 workers::WorkerCounters,
38};
39use crate::callsite::ObsCallsite;
40
41/// The global observer trait. **Sealed** in spirit (downstream crates
42/// implement it freely, but the SDK ships `NoopObserver`,
43/// `InMemoryObserver`, `StandardObserver` covering 99% of cases).
44///
45/// `&self` everywhere so the same observer reference is reused across
46/// every emit; no locks taken on the hot path.
47pub trait Observer: Send + Sync + 'static {
48 /// Hot-path emit. Never blocks. Never panics. Spec 11 § 3.
49 fn emit_envelope(&self, env: ObsEnvelope);
50
51 /// Cheap callsite filter check; called only when the cached
52 /// `Interest` is `Sometimes`. Default impl returns `true`
53 /// (allows every callsite that survived `enabled_static`).
54 fn enabled(&self, callsite: &ObsCallsite) -> bool {
55 let _ = callsite;
56 true
57 }
58
59 /// Monotonic generation counter; bumped on every filter / config
60 /// change so callsite caches re-validate. Spec 11 § 3.2.
61 fn generation(&self) -> u32 {
62 0
63 }
64
65 /// Force every callsite's `interest` cache back to `Unknown`.
66 /// Default impl is a no-op for observers that don't filter.
67 fn reload_filter(&self) {}
68
69 /// Flush all sinks; await in-flight batches. Default no-op.
70 fn flush(&self) -> crate::sink::SinkFut<'_> {
71 Box::pin(async {})
72 }
73
74 /// Shutdown all sinks and join workers. Idempotent.
75 fn shutdown(&self) -> crate::sink::SinkFut<'_> {
76 Box::pin(async {})
77 }
78
79 /// Synchronous shutdown for use in panic hooks and `Drop` impls
80 /// where awaiting is not possible. Best-effort within `timeout`.
81 /// Spec 11 § 3, § 6.1.
82 fn shutdown_blocking(&self, timeout: std::time::Duration) {
83 let _ = timeout;
84 }
85
86 /// Access this observer's per-process callsite registry, when it
87 /// has one. The bridge (Direction A) writes the registry on first
88 /// sight; `ObsToTracingSink` reads it to reconstitute
89 /// `tracing::Metadata` for interned envelopes. Spec 31 § 3.2.
90 fn callsites(&self) -> Option<std::sync::Arc<crate::registry::ObsCallsiteRegistry>> {
91 None
92 }
93
94 /// Access this observer's schema registry, when it has one. Sinks
95 /// hold their own `Arc<SchemaRegistry>` from construction; this
96 /// hook lets the bridge fall back to the global observer's
97 /// registry without depending on `StandardObserver`.
98 fn schema_registry(&self) -> Option<std::sync::Arc<crate::registry::SchemaRegistry>> {
99 None
100 }
101
102 /// Snapshot of the workspace-shared `ResourceAttrs` (OTel
103 /// semantic-convention keys). Sinks call this at flush time so a
104 /// single config-watcher reload re-projects every sink. Default
105 /// returns the empty / observer-less attribute set; concrete
106 /// observers (`StandardObserver`) override. Spec 20 § 2.1 / spec
107 /// 94 § 2.7 / P1-E.
108 fn resource_attrs(&self) -> std::sync::Arc<crate::resource::ResourceAttrs> {
109 std::sync::Arc::new(crate::resource::ResourceAttrs::default())
110 }
111}
112
113// ─── Resolution slots (spec 11 § 3) ───────────────────────────────────
114
115/// Global observer slot. `ArcSwap<T>` requires `T: Sized` (arc_swap's
116/// `RefCnt` is sized-only), and `dyn Observer` is unsized, so we
117/// store `Arc<dyn Observer>` (a sized fat pointer) inside the
118/// `ArcSwap`. `load_full()` therefore returns
119/// `Arc<Arc<dyn Observer>>`; `observer()` derefs the outer `Arc` to
120/// hand back `Arc<dyn Observer>` directly. See
121/// `docs/research/spike-arcswap.md`.
122static OBSERVER_GLOBAL: Lazy<ArcSwap<Arc<dyn Observer>>> = Lazy::new(|| {
123 let initial: Arc<dyn Observer> = Arc::new(NoopObserver);
124 ArcSwap::from_pointee(initial)
125});
126
127thread_local! {
128 /// Per-thread override. `RefCell` so nested installs stack LIFO.
129 /// Mirrors tracing's `CURRENT_STATE.default`.
130 static OBSERVER_THREAD: RefCell<Option<Arc<dyn Observer>>> =
131 const { RefCell::new(None) };
132
133 /// Re-entry guard. Set to `false` while inside an `Observer::emit_envelope`
134 /// so that an inner emit (a sink synthesising an event) returns
135 /// `NoopObserver` and becomes a no-op.
136 static CAN_ENTER: Cell<bool> = const { Cell::new(true) };
137}
138
139tokio::task_local! {
140 /// Per-task override. Set by `Future::with_observer(o)` (the
141 /// `Instrumented<F>` adapter; lands in spec 13 § 3, Phase 3
142 /// task 3.3). Defining the task-local slot now lets observer()
143 /// probe it correctly even before the adapter exists, so
144 /// resolution is forward-compatible.
145 static OBSERVER_TASK: Arc<dyn Observer>;
146}
147
148/// Hot-path fast flag. `0` ⇒ no override has ever been installed in
149/// this process; the resolver skips both probes and goes straight to
150/// the global. Spec 11 § 3 / spec 99 D-3.
151static OVERRIDE_COUNT: AtomicUsize = AtomicUsize::new(0);
152
153/// Resolve the active observer.
154///
155/// Hot path: returns a cloned `Arc` (one atomic refcount bump) so the
156/// caller does not hold a `Guard` across `await`. With no overrides
157/// ever installed, this collapses to `OVERRIDE_COUNT == 0` test +
158/// `OBSERVER_GLOBAL.load_full()`. Spec 11 § 3.
159#[inline]
160#[must_use]
161pub fn observer() -> Arc<dyn Observer> {
162 if !CAN_ENTER.with(Cell::get) {
163 return noop_observer_arc();
164 }
165 if OVERRIDE_COUNT.load(Ordering::Relaxed) == 0 {
166 // load_full() returns Arc<Arc<dyn Observer>>; (*x).clone()
167 // extracts the inner Arc<dyn Observer> with one refcount bump.
168 let outer = OBSERVER_GLOBAL.load_full();
169 return (*outer).clone();
170 }
171 if let Ok(per_task) = OBSERVER_TASK.try_with(Clone::clone) {
172 return per_task;
173 }
174 if let Some(per_thread) = OBSERVER_THREAD.with(|c| c.borrow().clone()) {
175 return per_thread;
176 }
177 let outer = OBSERVER_GLOBAL.load_full();
178 (*outer).clone()
179}
180
181fn noop_observer_arc() -> Arc<dyn Observer> {
182 Arc::new(NoopObserver)
183}
184
185/// Install the global observer. Called once at process start.
186///
187/// The argument is a constructed observer (typically
188/// `StandardObserver::builder().build()?`); we wrap it in `Arc` and
189/// store it under `OBSERVER_GLOBAL`.
190pub fn install_observer<O: Observer>(o: O) {
191 let arc: Arc<dyn Observer> = Arc::new(o);
192 OBSERVER_GLOBAL.store(Arc::new(arc));
193}
194
195/// Install a pre-arc'd observer. Convenience for tests and patterns
196/// that already have an `Arc<dyn Observer>` (e.g. multi-tenant
197/// registries).
198pub fn install_observer_arc(o: Arc<dyn Observer>) {
199 OBSERVER_GLOBAL.store(Arc::new(o));
200}
201
202/// Dispatch one envelope through the observer with the re-entry
203/// guard held. Spec 11 § 3.1 "Re-entry and the CAN_ENTER cell".
204///
205/// All emit paths (`Emit::emit`, the `obs::emit!` macro, the
206/// `<EventBuilder>::emit` setter) route through this so a sink that
207/// synthesises a new envelope from inside `Observer::emit_envelope`
208/// sees `observer()` returning `NoopObserver` and the inner emit
209/// becomes a no-op.
210#[inline]
211pub fn enter_emit_envelope(observer: &Arc<dyn Observer>, env: ObsEnvelope) {
212 let was_in = CAN_ENTER.with(|c| c.replace(false));
213 if was_in {
214 observer.emit_envelope(env);
215 } else {
216 // Spec 11 § 10 / spec 93 P2-13: surface the re-entry drop as
217 // an `ObsSinkDropped{tier=*, reason="reentry"}` self-event so
218 // operators can spot a sink that recursively emits. The
219 // emit-on-emit path itself is still suppressed by the
220 // `was_in == false` branch above; we only fire the metric.
221 let tier = match env.tier {
222 ::buffa::EnumValue::Known(t) => match t {
223 obs_proto::obs::v1::Tier::TIER_LOG => "log",
224 obs_proto::obs::v1::Tier::TIER_METRIC => "metric",
225 obs_proto::obs::v1::Tier::TIER_TRACE => "trace",
226 obs_proto::obs::v1::Tier::TIER_AUDIT => "audit",
227 _ => "unspecified",
228 },
229 _ => "unknown",
230 };
231 crate::self_events::emit_sink_dropped(tier, "reentry");
232 }
233 CAN_ENTER.with(|c| c.set(was_in));
234}
235
236/// Weak handle for code that needs to refer to the observer without
237/// extending its lifetime — chiefly sinks that internally hold
238/// callbacks back into the observer (e.g. the future
239/// `ObsToTracingSink` re-emitting through the registered tracing
240/// dispatcher).
241#[derive(Clone)]
242pub struct WeakObserver(Weak<dyn Observer>);
243
244impl WeakObserver {
245 /// Upgrade to a strong reference. Returns `None` after
246 /// `shutdown()` has dropped the last strong reference.
247 #[must_use]
248 pub fn upgrade(&self) -> Option<Arc<dyn Observer>> {
249 self.0.upgrade()
250 }
251}
252
253impl std::fmt::Debug for WeakObserver {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 f.debug_struct("WeakObserver").finish_non_exhaustive()
256 }
257}
258
259/// Weak handle to whatever `observer()` would return right now.
260#[must_use]
261pub fn observer_weak() -> WeakObserver {
262 let strong = observer();
263 WeakObserver(Arc::downgrade(&strong))
264}
265
266/// Sync RAII guard. Sets the per-thread observer override; restores
267/// the previous value on drop. **Do not hold across `.await`** — see
268/// the warning in spec 11 § 3.1; use `Future::with_observer` (Phase 3
269/// task 3.3) for async.
270///
271/// Bumps `OVERRIDE_COUNT` on first install in this process.
272#[must_use = "the override is reverted on Drop; bind to a variable"]
273pub fn with_observer_thread_local(o: Arc<dyn Observer>) -> ThreadObserverGuard {
274 let prev = OBSERVER_THREAD.with(|c| c.borrow_mut().replace(o));
275 OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
276 ThreadObserverGuard { prev }
277}
278
279/// RAII guard returned by [`with_observer_thread_local`].
280pub struct ThreadObserverGuard {
281 prev: Option<Arc<dyn Observer>>,
282}
283
284impl std::fmt::Debug for ThreadObserverGuard {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("ThreadObserverGuard")
287 .field("had_prev", &self.prev.is_some())
288 .finish()
289 }
290}
291
292impl Drop for ThreadObserverGuard {
293 fn drop(&mut self) {
294 OBSERVER_THREAD.with(|c| {
295 *c.borrow_mut() = self.prev.take();
296 });
297 OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
298 }
299}
300
301/// Test-flavored helper. Installs the observer per-thread for the
302/// duration of `f`; restores on closure exit. Used by `#[obs::test]`
303/// (Phase 2 task 2.6) and by users that want to assert what was
304/// emitted from a synchronous block.
305///
306/// Takes `Arc<dyn Observer>` so the caller can keep a clone and
307/// inspect it after the closure (e.g. `InMemoryObserver::handle()`).
308/// Spec 11 § 3.1.
309///
310/// Nested calls stack LIFO via the per-thread `RefCell` slot.
311pub fn with_test_observer<F, R>(observer: Arc<dyn Observer>, f: F) -> R
312where
313 F: FnOnce() -> R,
314{
315 let _g = with_observer_thread_local(observer);
316 f()
317}
318
319/// Async sibling of [`with_test_observer`]: install the observer in
320/// the per-task slot for the duration of `fut`. This is the version
321/// the `#[obs::test]` async expansion uses so that an emit on a
322/// migrated-tokio-worker-thread still resolves to the test observer
323/// (the per-thread slot would not be set on the destination worker).
324///
325/// `OVERRIDE_COUNT` is bumped before entering the scope and
326/// decremented after, so the hot-path resolver actually probes the
327/// task-local. Spec 11 § 3.1, KD-D3.
328pub async fn with_observer_task<F, R>(observer: Arc<dyn Observer>, fut: F) -> R
329where
330 F: std::future::Future<Output = R>,
331{
332 OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
333 let result = OBSERVER_TASK.scope(observer, fut).await;
334 OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
335 result
336}
337
338/// Synchronous sibling of [`with_observer_task`] used by
339/// `Instrumented<F>::poll` so a single poll can bind / unbind the
340/// per-task observer without requiring an `await`. Spec 13 § 3.
341pub fn with_observer_task_sync<F, R>(observer: Arc<dyn Observer>, f: F) -> R
342where
343 F: FnOnce() -> R,
344{
345 OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
346 let result = OBSERVER_TASK.sync_scope(observer, f);
347 OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
348 result
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use crate::observer::in_memory::InMemoryObserver;
355
356 #[test]
357 fn test_should_default_to_noop() {
358 // Default installation is `NoopObserver`. We can't easily
359 // identify it by type-id, but we can confirm `observer()`
360 // returns successfully.
361 let o = observer();
362 assert!(Arc::strong_count(&o) >= 1);
363 }
364
365 #[test]
366 fn test_with_test_observer_should_capture() {
367 let observer = InMemoryObserver::new();
368 let handle = observer.handle();
369 let observer: Arc<dyn Observer> = Arc::new(observer);
370 with_test_observer(observer, || {
371 // The thread-local is now set; observer() returns the
372 // InMemoryObserver, which is wired to its own InMemorySink.
373 });
374 // The InMemoryObserver in this Phase-1 shell does not fire
375 // sinks unless the user calls emit_envelope manually; this
376 // test is about resolution, not the full pipeline.
377 let _ = handle;
378 }
379}