#![deny(missing_docs)]
use crossbeam::channel;
use crossbeam::sync::ShardedLock;
use hdrhistogram::{sync::Recorder, SyncHistogram};
use indexmap::IndexMap;
use slab::Slab;
use std::cell::{RefCell, UnsafeCell};
use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::{atomic, Mutex};
use tracing_core::*;
pub type Hasher = fxhash::FxBuildHasher;
pub type HashMap<K, V> = std::collections::HashMap<K, V, Hasher>;
static TID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
thread_local! {
static SPAN: RefCell<Vec<span::Id>> = RefCell::new(Vec::new());
static MYTID: RefCell<Option<usize>> = RefCell::new(None);
}
mod builder;
pub use builder::Builder;
pub use hdrhistogram::Histogram;
pub mod group;
#[derive(Debug, Clone)]
struct SpanGroupContext<S> {
group: S,
parent: Option<span::Id>,
follows: Option<span::Id>,
meta: &'static Metadata<'static>,
}
type Map<S, E, T> = HashMap<S, HashMap<E, T>>;
pub trait SpanGroup {
type Id;
fn group(&self, span: &span::Attributes) -> Self::Id;
}
pub trait EventGroup {
type Id;
fn group(&self, event: &Event) -> Self::Id;
}
fn span_id_to_slab_idx(span: &span::Id) -> usize {
span.into_u64() as usize - 1
}
struct WriterState<S: Hash + Eq, E: Hash + Eq> {
last_event: Slab<atomic::AtomicU64>,
refcount: Slab<atomic::AtomicUsize>,
spans: Slab<SpanGroupContext<S>>,
tls: ThreadLocal<Map<S, E, Recorder<u64>>>,
idle_recorders: Map<S, E, hdrhistogram::sync::IdleRecorder<Recorder<u64>, u64>>,
created: channel::Sender<(S, E, SyncHistogram<u64>)>,
new_histogram: Box<dyn FnMut(&S, &E) -> Histogram<u64> + Send + Sync>,
}
impl<S, E> std::fmt::Debug for WriterState<S, E>
where
S: Hash + Eq + std::fmt::Debug,
E: Hash + Eq + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WriterState")
.field("last_event", &self.last_event)
.field("refcountn", &self.refcount)
.field("spans", &self.spans)
.field("idle_recorders", &self.idle_recorders)
.field("created", &self.created)
.finish()
}
}
#[derive(Debug)]
struct ReaderState<S: Hash + Eq, E: Hash + Eq> {
created: channel::Receiver<(S, E, SyncHistogram<u64>)>,
histograms: HashMap<S, IndexMap<E, SyncHistogram<u64>, Hasher>>,
}
pub struct TimingSubscriber<S = group::ByName, E = group::ByMessage>
where
S: SpanGroup,
E: EventGroup,
S::Id: Hash + Eq,
E::Id: Hash + Eq,
{
span_group: S,
event_group: E,
time: quanta::Clock,
writers: ShardedLock<WriterState<S::Id, E::Id>>,
reader: Mutex<ReaderState<S::Id, E::Id>>,
}
impl<S, E> std::fmt::Debug for TimingSubscriber<S, E>
where
S: SpanGroup + std::fmt::Debug,
E: EventGroup + std::fmt::Debug,
S::Id: Hash + Eq + std::fmt::Debug,
E::Id: Hash + Eq + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimingSubscriber")
.field("span_group", &self.span_group)
.field("event_group", &self.event_group)
.field("time", &self.time)
.field("writers", &self.writers)
.field("reader", &self.reader)
.finish()
}
}
impl<S, E> TimingSubscriber<S, E>
where
S: SpanGroup,
E: EventGroup,
S::Id: Clone + Hash + Eq,
E::Id: Clone + Hash + Eq,
{
fn time(&self, mut span: span::Id, event: &Event) {
let start = self.time.now();
let inner = self.writers.read().unwrap();
let record =
move |last_event: &Slab<atomic::AtomicU64>, r: &mut Recorder<u64>, span: &span::Id| {
let previous = last_event[span_id_to_slab_idx(span)]
.swap(self.time.now(), atomic::Ordering::AcqRel);
if previous > start {
return;
}
r.saturating_record(start - previous)
};
let tid = ThreadId::default();
let eid = self.event_group.group(event);
if let Some(ref tls) = inner.tls.get(&tid) {
let tls = unsafe { &mut *tls.get() };
loop {
let sgi = &inner.spans[span_id_to_slab_idx(&span)];
if let Some(ref mut recorder) =
tls.get_mut(&sgi.group).and_then(|rs| rs.get_mut(&eid))
{
record(&inner.last_event, recorder, &span);
} else if let Some(ref ir) = inner.idle_recorders[&sgi.group].get(&eid) {
let mut recorder = ir.recorder();
record(&inner.last_event, &mut recorder, &span);
let r = tls
.entry(sgi.group.clone())
.or_insert_with(Default::default)
.insert(eid.clone(), recorder);
assert!(r.is_none());
} else {
break;
}
if let Some(ref psi) = sgi.parent {
span = psi.clone();
} else {
return;
}
}
} else {
}
drop(inner);
let mut inner = self.writers.write().unwrap();
let inner = &mut *inner;
let tls = inner.tls.entry(tid).or_insert_with(Default::default);
let tls = unsafe { &mut *tls.get() };
let nh = &mut inner.new_histogram;
let created = &mut inner.created;
let idle = &mut inner.idle_recorders;
loop {
let sgi = &inner.spans[span_id_to_slab_idx(&span)];
let recorder = tls
.entry(sgi.group.clone())
.or_insert_with(Default::default)
.entry(eid.clone())
.or_insert_with(|| {
idle.get_mut(&sgi.group)
.unwrap()
.entry(eid.clone())
.or_insert_with(|| {
let h = (nh)(&sgi.group, &eid).into_sync();
let ir = h.recorder().into_idle();
created.send((sgi.group.clone(), eid.clone(), h)).expect(
"WriterState implies ReaderState, which holds the receiver",
);
ir
})
.recorder()
});
record(&inner.last_event, recorder, &span);
if let Some(ref psi) = sgi.parent {
span = psi.clone();
} else {
break;
}
}
}
pub fn force_synchronize(&self) {
let mut inner = self.writers.write().unwrap();
for tls in inner.tls.values_mut() {
let tls = unsafe { &mut *tls.get() };
tls.clear();
}
drop(inner);
self.with_histograms(|hs| {
for hs in hs.values_mut() {
for h in hs.values_mut() {
h.refresh_timeout(std::time::Duration::new(0, 0));
}
}
})
}
pub fn with_histograms<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut HashMap<S::Id, IndexMap<E::Id, SyncHistogram<u64>, Hasher>>) -> R,
{
let mut reader = self.reader.lock().unwrap();
while let Ok((sid, eid, h)) = reader.created.try_recv() {
let h = reader
.histograms
.entry(sid)
.or_insert_with(IndexMap::default)
.insert(eid, h);
assert!(
h.is_none(),
"second histogram created for same sid/eid combination"
);
}
f(&mut reader.histograms)
}
}
impl<S, E> Subscriber for TimingSubscriber<S, E>
where
S: SpanGroup + 'static,
E: EventGroup + 'static,
S::Id: Clone + Hash + Eq + 'static,
E::Id: Clone + Hash + Eq + 'static,
{
fn enabled(&self, _: &Metadata) -> bool {
true
}
fn new_span(&self, span: &span::Attributes) -> span::Id {
let group = self.span_group.group(span);
let parent = span
.parent()
.cloned()
.or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned()));
let sg = SpanGroupContext {
group,
parent,
follows: None,
meta: span.metadata(),
};
let mut inner = self.writers.write().unwrap();
let id = inner.refcount.insert(atomic::AtomicUsize::new(1));
let id2 = inner.spans.insert(sg.clone());
assert_eq!(id, id2);
inner
.idle_recorders
.entry(sg.group)
.or_insert_with(HashMap::default);
let id2 = inner
.last_event
.insert(atomic::AtomicU64::new(self.time.now()));
assert_eq!(id, id2);
span::Id::from_u64(id as u64 + 1)
}
fn record(&self, _: &span::Id, _: &span::Record) {}
fn record_follows_from(&self, span: &span::Id, follows: &span::Id) {
let mut inner = self.writers.write().unwrap();
inner
.spans
.get_mut(span_id_to_slab_idx(span))
.unwrap()
.follows = Some(follows.clone());
}
fn event(&self, event: &Event) {
let span = event.parent().cloned().or_else(|| {
SPAN.with(|current_span| {
let current_span = current_span.borrow();
current_span.last().cloned()
})
});
if let Some(span) = span {
self.time(span, event);
} else {
}
}
fn enter(&self, span: &span::Id) {
SPAN.with(|current_span| {
current_span.borrow_mut().push(span.clone());
})
}
fn exit(&self, span: &span::Id) {
SPAN.with(|current_span| {
let leaving = current_span
.borrow_mut()
.pop()
.expect("told to exit span when not in span");
assert_eq!(
&leaving, span,
"told to exit span that was not most recently entered"
);
})
}
fn clone_span(&self, span: &span::Id) -> span::Id {
let inner = self.writers.read().unwrap();
inner.refcount[span_id_to_slab_idx(span)].fetch_add(1, atomic::Ordering::AcqRel);
span.clone()
}
fn try_close(&self, span: span::Id) -> bool {
macro_rules! unwinding_lock {
($lock:expr) => {
match $lock {
Ok(g) => g,
Err(_) if std::thread::panicking() => {
return false;
}
r @ Err(_) => r.unwrap(),
}
};
};
if 1 == unwinding_lock!(self.writers.read()).refcount[span_id_to_slab_idx(&span)]
.fetch_sub(1, atomic::Ordering::AcqRel)
{
let mut inner = unwinding_lock!(self.writers.write());
inner.last_event.remove(span_id_to_slab_idx(&span));
inner.refcount.remove(span_id_to_slab_idx(&span));
inner.spans.remove(span_id_to_slab_idx(&span));
true
} else {
false
}
}
fn current_span(&self) -> span::Current {
SPAN.with(|current_span| {
current_span.borrow_mut().last().map(|sid| {
span::Current::new(
sid.clone(),
self.writers.read().unwrap().spans[span_id_to_slab_idx(sid)].meta,
)
})
})
.unwrap_or_else(span::Current::none)
}
}
#[derive(Debug, Copy)]
pub struct Downcaster<S, E> {
phantom: PhantomData<fn(S, E)>,
}
impl<S, E> Clone for Downcaster<S, E> {
fn clone(&self) -> Self {
Self {
phantom: PhantomData,
}
}
}
impl<S, E> TimingSubscriber<S, E>
where
S: SpanGroup,
E: EventGroup,
S::Id: Clone + Hash + Eq,
E::Id: Clone + Hash + Eq,
{
pub fn downcaster(&self) -> Downcaster<S, E> {
Downcaster {
phantom: PhantomData,
}
}
}
impl<S, E> Downcaster<S, E>
where
S: SpanGroup + 'static,
E: EventGroup + 'static,
S::Id: Clone + Hash + Eq + 'static,
E::Id: Clone + Hash + Eq + 'static,
{
pub fn downcast<'a>(&self, d: &'a Dispatch) -> Option<&'a TimingSubscriber<S, E>> {
d.downcast_ref()
}
}
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Copy, Clone)]
#[repr(transparent)]
struct ThreadId {
tid: usize,
_notsend: PhantomData<UnsafeCell<()>>,
}
impl Default for ThreadId {
fn default() -> Self {
MYTID.with(|mytid| {
let mut mytid = mytid.borrow_mut();
if let Some(ref mytid) = *mytid {
ThreadId {
tid: *mytid,
_notsend: PhantomData,
}
} else {
let tid = TID.fetch_add(1, atomic::Ordering::AcqRel);
*mytid = Some(tid);
ThreadId {
tid,
_notsend: PhantomData,
}
}
})
}
}
#[derive(Default)]
struct ThreadLocal<T>(HashMap<ThreadId, UnsafeCell<T>>);
impl<T> Deref for ThreadLocal<T> {
type Target = HashMap<ThreadId, UnsafeCell<T>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for ThreadLocal<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
unsafe impl<T: Send> Send for ThreadLocal<T> {}
unsafe impl<T: Sync> Sync for ThreadLocal<T> {}