1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
//! Publicly exported core [`LatencyTrace`]-related types and methods.

use std::{collections::BTreeMap, sync::Arc, thread};

use hdrhistogram::Histogram;
use tracing::span::Attributes;

use crate::{
    lt_collect_g::LatencyTraceG,
    summary_stats,
    tlc_param::{Either, Joined, Probed},
    SummaryStats, Wrapper,
};
pub use crate::{
    lt_collect_g::{LatencyTraceCfg, Timing},
    lt_refine_g::{SpanGroup, Timings, TimingsView},
    lt_report_g::ActivationError,
    probed_trace::ProbedTrace,
};

//==============
// Exported aliases

#[doc(hidden)]
/// Used for benchmarking purposes only
pub type LatencyTraceJ = LatencyTraceG<Joined>;

#[doc(hidden)]
/// Used for benchmarking purposes only
pub type LatencyTraceE = LatencyTraceG<Either>;

impl LatencyTraceE {
    pub fn select_probed() {
        Either::select_probed();
    }

    pub fn select_joined() {
        Either::select_joined()
    }
}

//==============
// pub impl for LatencyTraceCfg

impl LatencyTraceCfg {
    /// Creates a new [`LatencyTraceCfg`] the same as `self` but with the given `hist_high`
    /// (see [hdrhistogram::Histogram::high]).
    pub fn with_hist_high(&self, hist_high: u64) -> Self {
        LatencyTraceCfg {
            span_grouper: self.span_grouper.clone(),
            hist_high,
            hist_sigfig: self.hist_sigfig,
        }
    }

    /// Creates a new [`LatencyTraceCfg`] the same as `self` but with the given `hist_sigfig`
    /// (see [hdrhistogram::Histogram::sigfig]).
    pub fn with_hist_sigfig(&self, hist_sigfig: u8) -> Self {
        LatencyTraceCfg {
            span_grouper: self.span_grouper.clone(),
            hist_high: self.hist_high,
            hist_sigfig,
        }
    }

    /// Creates a new [`LatencyTraceCfg`] the same as `self` but with the given `span_grouper`.
    pub fn with_span_grouper(
        &self,
        span_grouper: impl Fn(&Attributes) -> Vec<(String, String)> + Send + Sync + 'static,
    ) -> Self {
        LatencyTraceCfg {
            span_grouper: Arc::new(span_grouper),
            hist_high: self.hist_high,
            hist_sigfig: self.hist_sigfig,
        }
    }
}

//==============
// pub impl for LatencyTrace

/// Core type supporting latency mesurements.
///
/// Encapsulates an implementation of [`tracing_subscriber::Layer`] and provides access to the latencies collected
/// for different span groups.
///
/// There should be a single instance of [`LatencyTrace`] in a process. That instance is set
/// (by method [`Self::activated`] or [`Self::activated_default`])
/// as the global default [`tracing::Subscriber`], of which there can be only one and it can't be changed once
/// it is set.
#[derive(Clone)]
pub struct LatencyTrace(pub(crate) LatencyTraceG<Probed>);

impl LatencyTrace {
    /// Returns the active instance of `Self` if it exists.
    pub fn active() -> Option<Self> {
        Some(Self(LatencyTraceG::active()?))
    }

    /// Returns the active instance of `Self` if it exists or activates a new instance with the given `config` otherwise.
    /// Activation entails setting the global default [`tracing::Subscriber`], of which there can be only one and it can't
    /// be changed once it is set.
    ///
    /// If a [`LatencyTrace`] has been previously activated in the same process, the `config` passed to this
    /// function will be ignored and the current active [`LatencyTrace`] will be returned.
    ///
    /// # Errors
    /// - [`ActivationError::HistogramConfigError`] if the `config`'s `hist_high` and `hist_sigfig` would cause
    /// [`hdrhistogram::Histogram::new_with_bounds`]`(1, hist_high, hist_sigfig)` to fail.
    /// - [`ActivationError::TracingSubscriberInitError`] if a global [`tracing::Subscriber`] is already set and its
    /// type is not the same as `Self`.
    pub fn activated(config: LatencyTraceCfg) -> Result<Self, ActivationError> {
        Ok(Self(LatencyTraceG::activated(config)?))
    }

    /// Returns the active instance of `Self` if it exists or activates a new instance with the default configuration otherwise.
    /// Activation entails setting the global default [`tracing::Subscriber`], of which there can be only one and it can't
    /// be changed once it is set.
    ///
    /// If a [`LatencyTrace`] has been previously activated in the same process, the default configuration
    /// will be ignored and the current active [`LatencyTrace`] will be returned.
    ///
    /// # Errors
    /// - [`ActivationError::TracingSubscriberInitError`] if a global [`tracing::Subscriber`] is already set and its
    /// type is not the same as `Self`.
    pub fn activated_default() -> Result<Self, ActivationError> {
        Ok(Self(LatencyTraceG::activated_default()?))
    }

    /// Executes the instrumented function `f` and, after `f` completes, returns the observed latencies.
    pub fn measure_latencies(&self, f: impl FnOnce()) -> Timings {
        self.0.measure_latencies(f)
    }

    /// Executes the instrumented function `f`, returning a [`ProbedTrace`] that allows partial latencies to be
    /// reported before `f` completes.
    pub fn measure_latencies_probed(
        self,
        f: impl FnOnce() + Send + 'static,
    ) -> Result<ProbedTrace, ActivationError> {
        let pt = ProbedTrace::new(self);
        let jh = thread::spawn(f);
        pt.set_join_handle(jh);
        Ok(pt)
    }
}

//==============
// pub impl for SpanGroup

impl SpanGroup {
    /// Returns the span group's name.
    pub fn name(&self) -> &'static str {
        self.name
    }

    /// Returns the span group's ID.
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Returns the span group's file name and code line.
    pub fn code_line(&self) -> &str {
        &self.code_line
    }

    /// Returns the span group's properties list.
    ///
    /// This list can be empty as is the case with the default span grouper.
    pub fn props(&self) -> &[(String, String)] {
        &self.props
    }

    /// Returns the ID of the span group's parent.
    pub fn parent_id(&self) -> Option<&str> {
        self.parent_id.iter().map(|x| x.as_ref()).next()
    }

    /// Returns the number of ancestor span groups this span group has.
    pub fn depth(&self) -> usize {
        self.depth
    }
}

//==============
// pub impl for TimingsView

impl<K> TimingsView<K> {
    /// Combines histogram values according to sets of keys that yield the same value when `f`
    /// is applied.
    pub fn aggregate<G>(&self, f: impl Fn(&K) -> G) -> TimingsView<G>
    where
        G: Ord,
    {
        let mut res: BTreeMap<G, Histogram<u64>> = BTreeMap::new();
        for (k, v) in self.iter() {
            // Construct aggregation map.
            let g = f(k);
            let hist = match res.get_mut(&g) {
                Some(hist) => hist,
                None => {
                    res.insert(g, Histogram::new_from(v));
                    res.get_mut(&f(k))
                        .expect("key `g == f(k)` was just inserted in `res`")
                }
            };
            hist.add(v)
                .expect("should not happen given histogram construction");
        }
        res.into()
    }

    /// Combines the histograms of `self` with those of another [`TimingsView`].
    pub fn add(&mut self, mut other: TimingsView<K>)
    where
        K: Ord,
    {
        // Combine into self the values in other that have keys in self.
        for (k, h) in self.iter_mut() {
            let other_h = other.remove(k);
            if let Some(other_h) = other_h {
                h.add(other_h)
                    .expect("should not happen given histogram construction");
            }
        }

        // Insert into self the entries in other that don't have keys in self.
        for (k, h) in other.0.into_iter() {
            self.insert(k, h);
        }
    }

    /// Produces a map whose values are the [`SummaryStats`] of `self`'s histogram values.
    pub fn summary_stats(&self) -> Wrapper<BTreeMap<K, SummaryStats>>
    where
        K: Ord + Clone,
    {
        self.map_values(summary_stats)
    }
}

//==============
// pub impl for Timings

impl Timings {
    /// Checks whether an aggregation function `f` used in [`Self::aggregate`] is consistent according to the following
    /// definition:
    /// - the values resulting from applying `f` to span groups are called ***aggregate key***s
    /// - the sets of span groups corresponding to each *aggregate key* are called ***aggregates***.
    /// - an aggregation function is consistent if and only if, for each *aggregate*, all the span groups in the
    /// *aggregate* have the same callsite.
    pub fn aggregator_is_consistent<G>(&self, f: impl Fn(&SpanGroup) -> G) -> bool
    where
        G: Ord,
    {
        let mut aggregates: BTreeMap<G, Arc<str>> = BTreeMap::new();
        let mut is_consistent = true;
        for k in self.keys() {
            let g = f(k);
            if is_consistent {
                is_consistent = match aggregates.get(&g) {
                    Some(code_line) => code_line.as_ref() == k.code_line(),
                    None => {
                        aggregates.insert(g, k.code_line.clone());
                        true
                    }
                };
            }
        }
        is_consistent
    }

    /// Returns a map from span group ID to [`SpanGroup`].
    fn id_to_span_group(&self) -> BTreeMap<String, SpanGroup> {
        self.keys()
            .map(|k| (k.id().to_owned(), k.clone()))
            .collect()
    }

    /// Returns a map that associates each [`SpanGroup`] to its parent.
    pub fn span_group_to_parent(&self) -> BTreeMap<SpanGroup, Option<SpanGroup>> {
        let id_to_sg = self.id_to_span_group();
        self.keys()
            .map(|sg| {
                let parent = sg.parent_id().map(|pid| {
                    id_to_sg
                        .get(pid)
                        .expect("`id_to_sg` must have key `pid` by construction")
                        .clone()
                });
                (sg.clone(), parent)
            })
            .collect()
    }
}