use hdrhistogram::Histogram;
use std::{
collections::HashMap,
fmt::Debug,
hash::Hash,
sync::Arc,
thread::{self, ThreadId},
time::Instant,
};
use tracing::{callsite::Identifier, span::Attributes, Id, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
use crate::tlc_param::{TlcBase, TlcDirect, TlcParam};
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub(crate) struct CallsiteInfo {
pub(crate) callsite_id: Identifier,
pub(crate) name: &'static str,
pub(crate) file: Option<String>,
pub(crate) line: Option<u32>,
pub(crate) parent: Option<Identifier>,
}
type CallsiteIdPath = Vec<Identifier>;
pub(crate) type Props = Vec<(String, String)>;
type PropsPath = Vec<Arc<Props>>;
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub(crate) struct SpanGroupPriv {
pub(crate) callsite_id_path: CallsiteIdPath,
pub(crate) props_path: PropsPath,
}
impl SpanGroupPriv {
pub(crate) fn parent(&self) -> Option<Self> {
let len = self.callsite_id_path.len();
if len == 1 {
return None;
}
Some(SpanGroupPriv {
callsite_id_path: self.callsite_id_path[0..len - 1].into(),
props_path: self.props_path[0..len - 1].into(),
})
}
}
pub type Timing = Histogram<u64>;
pub(crate) fn new_timing(hist_high: u64, hist_sigfig: u8) -> Timing {
let mut hist = Histogram::<u64>::new_with_bounds(1, hist_high, hist_sigfig)
.expect("should not happen given histogram construction");
hist.auto(true);
hist
}
#[doc(hidden)]
#[derive(Clone)]
pub struct RawTrace {
pub(crate) timings: HashMap<SpanGroupPriv, Timing>,
pub(crate) callsite_infos: HashMap<Identifier, CallsiteInfo>,
}
impl RawTrace {
pub(crate) fn new() -> Self {
Self {
timings: HashMap::new(),
callsite_infos: HashMap::new(),
}
}
}
pub(crate) type AccRawTrace = Vec<RawTrace>;
#[derive(Debug)]
struct SpanTiming {
callsite_id_path: CallsiteIdPath,
props_path: PropsPath,
created_at: Instant,
}
pub(crate) fn op(raw_trace: RawTrace, acc: &mut AccRawTrace, tid: ThreadId) {
log::debug!("executing `op` for {:?}", tid);
acc.push(raw_trace);
}
pub(crate) fn op_r(acc1: RawTrace, acc2: RawTrace) -> RawTrace {
let mut timings = acc1.timings;
for (k, v) in acc2.timings {
let hist = timings.get_mut(&k);
match hist {
Some(hist) => hist
.add(v)
.expect("should not happen given histogram construction"),
None => {
timings.insert(k, v);
}
}
}
let callsite_infos: HashMap<Identifier, CallsiteInfo> = acc1
.callsite_infos
.into_iter()
.chain(acc2.callsite_infos)
.collect();
RawTrace {
timings,
callsite_infos,
}
}
pub struct LatencyTraceCfg {
pub(crate) span_grouper: SpanGrouper,
pub(crate) hist_high: u64,
pub(crate) hist_sigfig: u8,
}
type SpanGrouper = Arc<dyn Fn(&Attributes) -> Vec<(String, String)> + Send + Sync + 'static>;
#[doc(hidden)]
#[derive(Clone)]
pub struct LatencyTraceG<P>
where
P: TlcParam,
{
pub(crate) control: P::Control,
span_grouper: SpanGrouper,
pub(crate) hist_high: u64,
pub(crate) hist_sigfig: u8,
}
impl<P> LatencyTraceG<P>
where
P: TlcParam,
P::Control: TlcBase,
{
pub(crate) fn new(config: LatencyTraceCfg) -> Self {
LatencyTraceG {
control: P::Control::new(),
span_grouper: config.span_grouper,
hist_high: config.hist_high,
hist_sigfig: config.hist_sigfig,
}
}
fn update_timings(&self, span_group_priv: &SpanGroupPriv, f: impl FnOnce(&mut Timing)) {
self.control.with_data_mut(|raw_trace| {
let timing = {
if let Some(timing) = raw_trace.timings.get_mut(span_group_priv) {
timing
} else {
log::trace!(
"thread-loacal Timing created for {:?} on {:?}",
span_group_priv,
thread::current().id()
);
raw_trace.timings.insert(
span_group_priv.clone(),
new_timing(self.hist_high, self.hist_sigfig),
);
raw_trace
.timings
.get_mut(span_group_priv)
.expect("impossible: span_group_priv key was just inserted")
}
};
f(timing);
log::trace!(
"exiting `update_timings` for {:?} on {:?}",
span_group_priv,
thread::current().id()
);
});
}
fn update_callsite_infos(
&self,
callsite_id: Identifier,
callsite_info: impl FnOnce() -> CallsiteInfo,
) {
self.control.with_data_mut(|timings_priv| {
let callsite_infos = &mut timings_priv.callsite_infos;
if callsite_infos.get(&callsite_id).is_none() {
callsite_infos.insert(callsite_id, callsite_info());
}
});
}
}
impl<P> LatencyTraceG<P>
where
P: TlcParam,
P::Control: TlcDirect,
{
pub(crate) fn take_acc_timings(&self) -> AccRawTrace {
log::trace!("entering `take_acc_timings`");
self.control.take_tls();
self.control.take_acc(AccRawTrace::new())
}
}
impl<S, P> Layer<S> for LatencyTraceG<P>
where
S: Subscriber,
S: for<'lookup> LookupSpan<'lookup>,
P: TlcParam + 'static,
P::Control: TlcBase,
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let span = ctx
.span(id)
.expect("impossible: there is no span with the given id");
log::trace!("`on_new_span` start: name={}, id={:?}", span.name(), id);
let meta = span.metadata();
let callsite_id = meta.callsite();
let parent_span = span.parent();
let props = (self.span_grouper)(attrs);
let (callsite_id_path, props_path) = match &parent_span {
None => (vec![callsite_id.clone()], vec![Arc::new(props)]),
Some(parent_span) => {
let ext = parent_span.extensions();
let pst = ext
.get::<SpanTiming>()
.expect("span extensions does not contain SpanTiming record");
let mut callsite_id_path = pst.callsite_id_path.clone();
callsite_id_path.push(callsite_id.clone());
let mut props_path = pst.props_path.clone();
props_path.push(Arc::new(props));
(callsite_id_path, props_path)
}
};
span.extensions_mut().insert(SpanTiming {
callsite_id_path,
props_path,
created_at: Instant::now(),
});
let callsite_info = {
let callsite_id = callsite_id.clone();
let span = &span;
move || CallsiteInfo {
callsite_id,
name: span.name(),
file: meta.file().map(|s| s.to_owned()),
line: meta.line(),
parent: parent_span
.iter()
.map(|parent_ref| parent_ref.metadata().callsite())
.next(),
}
};
self.update_callsite_infos(callsite_id, callsite_info);
log::trace!("`on_new_span` end: name={}, id={:?}", span.name(), id);
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
let span = ctx
.span(&id)
.expect("impossible: there is no span with the given id");
log::trace!("`on_close` start: name={}, id={:?}", span.name(), id);
let ext = span.extensions();
let span_timing = ext
.get::<SpanTiming>()
.expect("span extensions does not contain SpanTiming record");
let span_group_priv = SpanGroupPriv {
callsite_id_path: span_timing.callsite_id_path.clone(),
props_path: span_timing.props_path.clone(),
};
self.update_timings(&span_group_priv, |hist| {
hist.record((Instant::now() - span_timing.created_at).as_micros() as u64)
.expect("should not happen given histogram construction");
});
log::trace!(
"`on_close` completed call to update_timings: name={}, id={:?}",
span.name(),
id
);
log::trace!("`on_close` end: name={}, id={:?}", span.name(), id);
}
}