use super::*;
use crossbeam::sync::ShardedLock;
use hdrhistogram::SyncHistogram;
use indexmap::IndexMap;
use slab::Slab;
use std::cell::RefCell;
use std::hash::Hash;
use std::sync::atomic;
thread_local! {
static SPAN: RefCell<Vec<span::Id>> = RefCell::new(Vec::new());
}
pub struct TimingSubscriber<S = group::ByName, E = group::ByMessage>
where
S: SpanGroup,
E: EventGroup,
S::Id: Hash + Eq,
E::Id: Hash + Eq,
{
spans: ShardedLock<Slab<SpanGroupContext<S::Id>>>,
timing: Timing<S, E>,
}
impl<S, E> std::fmt::Debug for TimingSubscriber<S, E>
where
S: SpanGroup + std::fmt::Debug,
S::Id: Hash + Eq + std::fmt::Debug,
E: EventGroup + std::fmt::Debug,
E::Id: Hash + Eq + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimingSubscriber")
.field("spans", &self.spans)
.field("timing", &self.timing)
.finish()
}
}
#[derive(Debug)]
struct SpanGroupContext<S> {
parent: Option<span::Id>,
follows: Option<span::Id>,
meta: &'static Metadata<'static>,
state: SpanState<S>,
refcount: atomic::AtomicUsize,
}
impl<S, E> TimingSubscriber<S, E>
where
S: SpanGroup,
E: EventGroup,
S::Id: Hash + Eq,
E::Id: Hash + Eq,
{
pub(crate) fn new(timing: Timing<S, E>) -> Self {
Self {
timing,
spans: Default::default(),
}
}
pub fn force_synchronize(&self) {
self.timing.force_synchronize()
}
pub fn with_histograms<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut HashMap<S::Id, IndexMap<E::Id, SyncHistogram<u64>, Hasher>>) -> R,
{
self.timing.with_histograms(f)
}
}
impl<S, E> Subscriber for TimingSubscriber<S, E>
where
S: SpanGroup + 'static,
E: EventGroup + 'static,
S::Id: Clone + Hash + Eq + 'static,
E::Id: Clone + Hash + Eq + 'static,
{
fn enabled(&self, _: &Metadata) -> bool {
true
}
fn new_span(&self, span: &span::Attributes) -> span::Id {
let group = self.timing.span_group.group(span);
let parent = span
.parent()
.cloned()
.or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned()));
let sg = SpanGroupContext {
parent,
follows: None,
meta: span.metadata(),
refcount: atomic::AtomicUsize::new(1),
state: SpanState {
group: group.clone(),
last_event: atomic::AtomicU64::new(self.timing.time.raw()),
},
};
let id = {
let mut inner = self.spans.write().unwrap();
inner.insert(sg)
};
self.timing.ensure_group(group);
span::Id::from_u64(id as u64 + 1)
}
fn record(&self, _: &span::Id, _: &span::Record) {}
fn record_follows_from(&self, span: &span::Id, follows: &span::Id) {
let mut inner = self.spans.write().unwrap();
inner.get_mut(span_id_to_slab_idx(span)).unwrap().follows = Some(follows.clone());
}
fn event(&self, event: &Event) {
let span = event.parent().cloned().or_else(|| {
SPAN.with(|current_span| {
let current_span = current_span.borrow();
current_span.last().cloned()
})
});
if let Some(span) = span {
let inner = self.spans.read().unwrap();
let inner = &*inner;
self.timing.time(event, |on_each| {
let mut current = Some(span.clone());
while let Some(ref at) = current {
let idx = span_id_to_slab_idx(&at);
let span = &inner[idx];
if !on_each(&span.state) {
break;
}
current = span.parent.clone();
}
});
} else {
}
}
fn enter(&self, span: &span::Id) {
SPAN.with(|current_span| {
current_span.borrow_mut().push(span.clone());
})
}
fn exit(&self, span: &span::Id) {
SPAN.with(|current_span| {
let leaving = current_span
.borrow_mut()
.pop()
.expect("told to exit span when not in span");
assert_eq!(
&leaving, span,
"told to exit span that was not most recently entered"
);
})
}
fn clone_span(&self, span: &span::Id) -> span::Id {
let inner = self.spans.read().unwrap();
inner[span_id_to_slab_idx(span)]
.refcount
.fetch_add(1, atomic::Ordering::AcqRel);
span.clone()
}
fn try_close(&self, span: span::Id) -> bool {
macro_rules! unwinding_lock {
($lock:expr) => {
match $lock {
Ok(g) => g,
Err(_) if std::thread::panicking() => {
return false;
}
r @ Err(_) => r.unwrap(),
}
};
}
if 1 == unwinding_lock!(self.spans.read())[span_id_to_slab_idx(&span)]
.refcount
.fetch_sub(1, atomic::Ordering::AcqRel)
{
if self.timing.span_close_events {
let inner = unwinding_lock!(self.spans.read());
if let Some(span_info) = inner.get(span_id_to_slab_idx(&span)) {
let meta = span_info.meta;
let fs = field::FieldSet::new(&["message"], meta.callsite());
let fld = fs.iter().next().unwrap();
let v = [(&fld, Some(&"close" as &dyn field::Value))];
let vs = fs.value_set(&v);
let e = Event::new_child_of(span.clone(), meta, &vs);
self.event(&e);
}
}
let mut inner = unwinding_lock!(self.spans.write());
inner.remove(span_id_to_slab_idx(&span));
true
} else {
false
}
}
fn current_span(&self) -> span::Current {
SPAN.with(|current_span| {
current_span.borrow_mut().last().map(|sid| {
span::Current::new(
sid.clone(),
self.spans.read().unwrap()[span_id_to_slab_idx(sid)].meta,
)
})
})
.unwrap_or_else(span::Current::none)
}
}
#[derive(Debug, Copy)]
pub struct Downcaster<S, E> {
phantom: PhantomData<fn(S, E)>,
}
impl<S, E> Clone for Downcaster<S, E> {
fn clone(&self) -> Self {
Self {
phantom: PhantomData,
}
}
}
impl<S, E> TimingSubscriber<S, E>
where
S: SpanGroup,
E: EventGroup,
S::Id: Clone + Hash + Eq,
E::Id: Clone + Hash + Eq,
{
pub fn downcaster(&self) -> Downcaster<S, E> {
Downcaster {
phantom: PhantomData,
}
}
}
impl<S, E> Downcaster<S, E>
where
S: SpanGroup + 'static,
E: EventGroup + 'static,
S::Id: Clone + Hash + Eq + 'static,
E::Id: Clone + Hash + Eq + 'static,
{
pub fn downcast<'a>(&self, d: &'a Dispatch) -> Option<&'a TimingSubscriber<S, E>> {
d.downcast_ref()
}
}