1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
//! Collection of timing information in an efficient way that is not convenient to display.

use hdrhistogram::Histogram;
use std::{
    collections::HashMap,
    fmt::Debug,
    hash::Hash,
    sync::Arc,
    thread::{self, ThreadId},
    time::Instant,
};
use tracing::{callsite::Identifier, span::Attributes, Id, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

use crate::tlc_param::{TlcBase, TlcDirect, TlcParam};

//=================
// Callsite

/// Provides information about where the tracing span was defined.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub(crate) struct CallsiteInfo {
    pub(crate) callsite_id: Identifier,
    pub(crate) name: &'static str,
    pub(crate) file: Option<String>,
    pub(crate) line: Option<u32>,
    pub(crate) parent: Option<Identifier>,
}

//=================
// Paths

// Types used in span groups or to support data collection.

type CallsiteIdPath = Vec<Identifier>;
pub(crate) type Props = Vec<(String, String)>;
type PropsPath = Vec<Arc<Props>>;

/// Private form of [`crate::SpanGroup`] used during trace collection, more efficient than [`crate::SpanGroup`] for trace
/// data collection.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub(crate) struct SpanGroupPriv {
    /// Callsite ID of the span group preceded by the callsite IDs of its ancestors.
    pub(crate) callsite_id_path: CallsiteIdPath,

    /// Properties of the span group preceded by the properties of its ancestors.
    pub(crate) props_path: PropsPath,
}

impl SpanGroupPriv {
    pub(crate) fn parent(&self) -> Option<Self> {
        let len = self.callsite_id_path.len();
        if len == 1 {
            return None;
        }
        Some(SpanGroupPriv {
            callsite_id_path: self.callsite_id_path[0..len - 1].into(),
            props_path: self.props_path[0..len - 1].into(),
        })
    }
}

//=================
// Timing and Timings

/// Alias of [`Histogram<u64>`].
pub type Timing = Histogram<u64>;

/// Constructs a [`Timing`]. The arguments correspond to [Histogram::high] and [Histogram::sigfig].
pub(crate) fn new_timing(hist_high: u64, hist_sigfig: u8) -> Timing {
    let mut hist = Histogram::<u64>::new_with_bounds(1, hist_high, hist_sigfig)
        .expect("should not happen given histogram construction");
    hist.auto(true);
    hist
}

#[doc(hidden)]
/// Type of latency information internally collected for span groups. The key is [SpanGroupPriv], which is as
/// light as possible to minimize processing overhead when accessing the map. Therefore, part of the information
/// required to produce the ultimate results is kept as a separate `callsite_infos` map keyed by [`Identifier`].
#[derive(Clone)]
pub struct RawTrace {
    pub(crate) timings: HashMap<SpanGroupPriv, Timing>,
    pub(crate) callsite_infos: HashMap<Identifier, CallsiteInfo>,
}

impl RawTrace {
    pub(crate) fn new() -> Self {
        Self {
            timings: HashMap::new(),
            callsite_infos: HashMap::new(),
        }
    }
}

/// Type of accumulator of thread-local values, prior to transforming the collected information to a [`crate::Timings`].
/// Used to minimize the time holding the control lock during post-processing.
/// The downside is that more memory is used when there are many threads.
// pub(crate) type AccTimings = Vec<HashMap<SpanGroupPriv, TimingPriv>>;
pub(crate) type AccRawTrace = Vec<RawTrace>;

//=================
// SpanTiming

/// Information about a span stored in the registry.
#[derive(Debug)]
struct SpanTiming {
    // callsite_info_priv_path: CallsiteInfoPrivPath,
    callsite_id_path: CallsiteIdPath,
    props_path: PropsPath,
    created_at: Instant,
}

pub(crate) fn op(raw_trace: RawTrace, acc: &mut AccRawTrace, tid: ThreadId) {
    log::debug!("executing `op` for {:?}", tid);
    acc.push(raw_trace);
}

pub(crate) fn op_r(acc1: RawTrace, acc2: RawTrace) -> RawTrace {
    let mut timings = acc1.timings;
    for (k, v) in acc2.timings {
        let hist = timings.get_mut(&k);
        match hist {
            Some(hist) => hist
                .add(v)
                .expect("should not happen given histogram construction"),
            None => {
                timings.insert(k, v);
            }
        }
    }

    let callsite_infos: HashMap<Identifier, CallsiteInfo> = acc1
        .callsite_infos
        .into_iter()
        .chain(acc2.callsite_infos)
        .collect();

    RawTrace {
        timings,
        callsite_infos,
    }
}

//=================
// LatencyTraceCfg

/// Configuration information for [`LatencyTrace`](crate::LatencyTrace). It is instantiated with its [`LatencyTraceCfg::default`] method
/// and can be customized with its other methods.
pub struct LatencyTraceCfg {
    pub(crate) span_grouper: SpanGrouper,
    pub(crate) hist_high: u64,
    pub(crate) hist_sigfig: u8,
}

//=================
// SpanGrouper

/// Internal type of span groupers.
type SpanGrouper = Arc<dyn Fn(&Attributes) -> Vec<(String, String)> + Send + Sync + 'static>;

//=================
// LatencyTrace

#[doc(hidden)]
/// Core generic type supporting latency mesurements. Supports latency collection and aggregation with different
/// [`thread_local_collect::tlm`] modules.
///
/// Implements [`tracing_subscriber::Layer`] and provides access to the latencies collected for different span groups.
///
/// There should be a single instance of [`LatencyTraceG`] in a process. That instance is set
/// (by method [`Self::activated`] or [`Self::activated_default`])
/// as the global default [`tracing::Subscriber`], of which there can be only one and it can't be changed once
/// it is set.
#[derive(Clone)]
pub struct LatencyTraceG<P>
where
    P: TlcParam,
{
    pub(crate) control: P::Control,
    span_grouper: SpanGrouper,
    pub(crate) hist_high: u64,
    pub(crate) hist_sigfig: u8,
}

impl<P> LatencyTraceG<P>
where
    P: TlcParam,
    P::Control: TlcBase,
{
    pub(crate) fn new(config: LatencyTraceCfg) -> Self {
        LatencyTraceG {
            control: P::Control::new(),
            span_grouper: config.span_grouper,
            hist_high: config.hist_high,
            hist_sigfig: config.hist_sigfig,
        }
    }

    /// Updates timings for the given span group. Called by [`Layer`] impl.
    fn update_timings(&self, span_group_priv: &SpanGroupPriv, f: impl FnOnce(&mut Timing)) {
        self.control.with_data_mut(|raw_trace| {
            let timing = {
                if let Some(timing) = raw_trace.timings.get_mut(span_group_priv) {
                    timing
                } else {
                    log::trace!(
                        "thread-loacal Timing created for {:?} on {:?}",
                        span_group_priv,
                        thread::current().id()
                    );
                    raw_trace.timings.insert(
                        span_group_priv.clone(),
                        new_timing(self.hist_high, self.hist_sigfig),
                    );
                    raw_trace
                        .timings
                        .get_mut(span_group_priv)
                        .expect("impossible: span_group_priv key was just inserted")
                }
            };

            f(timing);

            log::trace!(
                "exiting `update_timings` for {:?} on {:?}",
                span_group_priv,
                thread::current().id()
            );
        });
    }

    /// Updates callsite info for the given callsite [`Identifier`].
    fn update_callsite_infos(
        &self,
        callsite_id: Identifier,
        callsite_info: impl FnOnce() -> CallsiteInfo,
    ) {
        self.control.with_data_mut(|timings_priv| {
            let callsite_infos = &mut timings_priv.callsite_infos;
            if callsite_infos.get(&callsite_id).is_none() {
                callsite_infos.insert(callsite_id, callsite_info());
            }
        });
    }
}

impl<P> LatencyTraceG<P>
where
    P: TlcParam,
    P::Control: TlcDirect,
{
    /// Extracts the accumulated timings.
    pub(crate) fn take_acc_timings(&self) -> AccRawTrace {
        log::trace!("entering `take_acc_timings`");
        self.control.take_tls();
        self.control.take_acc(AccRawTrace::new())
    }
}

impl<S, P> Layer<S> for LatencyTraceG<P>
where
    S: Subscriber,
    S: for<'lookup> LookupSpan<'lookup>,
    P: TlcParam + 'static,
    P::Control: TlcBase,
{
    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
        let span = ctx
            .span(id)
            .expect("impossible: there is no span with the given id");
        log::trace!("`on_new_span` start: name={}, id={:?}", span.name(), id);
        let meta = span.metadata();
        let callsite_id = meta.callsite();
        let parent_span = span.parent();

        let props = (self.span_grouper)(attrs);
        let (callsite_id_path, props_path) = match &parent_span {
            None => (vec![callsite_id.clone()], vec![Arc::new(props)]),
            Some(parent_span) => {
                let ext = parent_span.extensions();
                let pst = ext
                    .get::<SpanTiming>()
                    .expect("span extensions does not contain SpanTiming record");
                let mut callsite_id_path = pst.callsite_id_path.clone();
                callsite_id_path.push(callsite_id.clone());
                let mut props_path = pst.props_path.clone();
                props_path.push(Arc::new(props));
                (callsite_id_path, props_path)
            }
        };

        span.extensions_mut().insert(SpanTiming {
            callsite_id_path,
            props_path,
            created_at: Instant::now(),
        });

        let callsite_info = {
            let callsite_id = callsite_id.clone();
            let span = &span;
            move || CallsiteInfo {
                callsite_id,
                name: span.name(),
                file: meta.file().map(|s| s.to_owned()),
                line: meta.line(),
                parent: parent_span
                    .iter()
                    .map(|parent_ref| parent_ref.metadata().callsite())
                    .next(),
            }
        };

        self.update_callsite_infos(callsite_id, callsite_info);

        log::trace!("`on_new_span` end: name={}, id={:?}", span.name(), id);
    }

    // No need for fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {

    // No need for fn on_exit(&self, id: &Id, ctx: Context<'_, S>)

    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
        let span = ctx
            .span(&id)
            .expect("impossible: there is no span with the given id");
        log::trace!("`on_close` start: name={}, id={:?}", span.name(), id);

        let ext = span.extensions();
        let span_timing = ext
            .get::<SpanTiming>()
            .expect("span extensions does not contain SpanTiming record");

        let span_group_priv = SpanGroupPriv {
            callsite_id_path: span_timing.callsite_id_path.clone(),
            props_path: span_timing.props_path.clone(),
        };

        self.update_timings(&span_group_priv, |hist| {
            hist.record((Instant::now() - span_timing.created_at).as_micros() as u64)
                .expect("should not happen given histogram construction");
        });

        log::trace!(
            "`on_close` completed call to update_timings: name={}, id={:?}",
            span.name(),
            id
        );

        log::trace!("`on_close` end: name={}, id={:?}", span.name(), id);
    }
}