mod in_memory;
mod noop;
mod standard;
pub(crate) mod workers;
use std::{
cell::{Cell, RefCell},
sync::{
Arc, Weak,
atomic::{AtomicUsize, Ordering},
},
};
use arc_swap::ArcSwap;
use obs_proto::obs::v1::ObsEnvelope;
use once_cell::sync::Lazy;
pub use self::{
in_memory::{InMemoryHandle, InMemoryObserver},
noop::NoopObserver,
standard::{BuildError, StandardObserver, StandardObserverBuilder},
workers::WorkerCounters,
};
use crate::callsite::ObsCallsite;
pub trait Observer: Send + Sync + 'static {
fn emit_envelope(&self, env: ObsEnvelope);
fn enabled(&self, callsite: &ObsCallsite) -> bool {
let _ = callsite;
true
}
fn generation(&self) -> u32 {
0
}
fn reload_filter(&self) {}
fn flush(&self) -> crate::sink::SinkFut<'_> {
Box::pin(async {})
}
fn shutdown(&self) -> crate::sink::SinkFut<'_> {
Box::pin(async {})
}
fn shutdown_blocking(&self, timeout: std::time::Duration) {
let _ = timeout;
}
fn callsites(&self) -> Option<std::sync::Arc<crate::registry::ObsCallsiteRegistry>> {
None
}
fn schema_registry(&self) -> Option<std::sync::Arc<crate::registry::SchemaRegistry>> {
None
}
fn resource_attrs(&self) -> std::sync::Arc<crate::resource::ResourceAttrs> {
std::sync::Arc::new(crate::resource::ResourceAttrs::default())
}
}
static OBSERVER_GLOBAL: Lazy<ArcSwap<Arc<dyn Observer>>> = Lazy::new(|| {
let initial: Arc<dyn Observer> = Arc::new(NoopObserver);
ArcSwap::from_pointee(initial)
});
thread_local! {
static OBSERVER_THREAD: RefCell<Option<Arc<dyn Observer>>> =
const { RefCell::new(None) };
static CAN_ENTER: Cell<bool> = const { Cell::new(true) };
}
tokio::task_local! {
static OBSERVER_TASK: Arc<dyn Observer>;
}
static OVERRIDE_COUNT: AtomicUsize = AtomicUsize::new(0);
#[inline]
#[must_use]
pub fn observer() -> Arc<dyn Observer> {
if !CAN_ENTER.with(Cell::get) {
return noop_observer_arc();
}
if OVERRIDE_COUNT.load(Ordering::Relaxed) == 0 {
let outer = OBSERVER_GLOBAL.load_full();
return (*outer).clone();
}
if let Ok(per_task) = OBSERVER_TASK.try_with(Clone::clone) {
return per_task;
}
if let Some(per_thread) = OBSERVER_THREAD.with(|c| c.borrow().clone()) {
return per_thread;
}
let outer = OBSERVER_GLOBAL.load_full();
(*outer).clone()
}
fn noop_observer_arc() -> Arc<dyn Observer> {
Arc::new(NoopObserver)
}
pub fn install_observer<O: Observer>(o: O) {
let arc: Arc<dyn Observer> = Arc::new(o);
OBSERVER_GLOBAL.store(Arc::new(arc));
}
pub fn install_observer_arc(o: Arc<dyn Observer>) {
OBSERVER_GLOBAL.store(Arc::new(o));
}
#[inline]
pub fn enter_emit_envelope(observer: &Arc<dyn Observer>, env: ObsEnvelope) {
let was_in = CAN_ENTER.with(|c| c.replace(false));
if was_in {
observer.emit_envelope(env);
} else {
let tier = match env.tier {
::buffa::EnumValue::Known(t) => t.as_str(),
_ => "unknown",
};
crate::self_events::emit_sink_dropped(tier, "reentry");
}
CAN_ENTER.with(|c| c.set(was_in));
}
#[derive(Clone)]
pub struct WeakObserver(Weak<dyn Observer>);
impl WeakObserver {
#[must_use]
pub fn upgrade(&self) -> Option<Arc<dyn Observer>> {
self.0.upgrade()
}
}
impl std::fmt::Debug for WeakObserver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WeakObserver").finish_non_exhaustive()
}
}
#[must_use]
pub fn observer_weak() -> WeakObserver {
let strong = observer();
WeakObserver(Arc::downgrade(&strong))
}
#[must_use = "the override is reverted on Drop; bind to a variable"]
pub fn with_observer_thread_local(o: Arc<dyn Observer>) -> ThreadObserverGuard {
let prev = OBSERVER_THREAD.with(|c| c.borrow_mut().replace(o));
OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
ThreadObserverGuard { prev }
}
pub struct ThreadObserverGuard {
prev: Option<Arc<dyn Observer>>,
}
impl std::fmt::Debug for ThreadObserverGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThreadObserverGuard")
.field("had_prev", &self.prev.is_some())
.finish()
}
}
impl Drop for ThreadObserverGuard {
fn drop(&mut self) {
OBSERVER_THREAD.with(|c| {
*c.borrow_mut() = self.prev.take();
});
OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn with_test_observer<F, R>(observer: Arc<dyn Observer>, f: F) -> R
where
F: FnOnce() -> R,
{
let _g = with_observer_thread_local(observer);
f()
}
pub async fn with_observer_task<F, R>(observer: Arc<dyn Observer>, fut: F) -> R
where
F: std::future::Future<Output = R>,
{
OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
let result = OBSERVER_TASK.scope(observer, fut).await;
OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
result
}
pub fn with_observer_task_sync<F, R>(observer: Arc<dyn Observer>, f: F) -> R
where
F: FnOnce() -> R,
{
OVERRIDE_COUNT.fetch_add(1, Ordering::Relaxed);
let result = OBSERVER_TASK.sync_scope(observer, f);
OVERRIDE_COUNT.fetch_sub(1, Ordering::Relaxed);
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::observer::in_memory::InMemoryObserver;
#[test]
fn test_should_default_to_noop() {
let o = observer();
assert!(Arc::strong_count(&o) >= 1);
}
#[test]
fn test_with_test_observer_should_capture() {
let observer = InMemoryObserver::new();
let handle = observer.handle();
let observer: Arc<dyn Observer> = Arc::new(observer);
with_test_observer(observer, || {
});
let _ = handle;
}
}