tracing_timing/
subscriber.rs

1use super::*;
2use crossbeam::sync::ShardedLock;
3use hdrhistogram::SyncHistogram;
4use indexmap::IndexMap;
5use slab::Slab;
6use std::cell::RefCell;
7use std::hash::Hash;
8use std::sync::atomic;
9
10thread_local! {
11    static SPAN: RefCell<Vec<span::Id>> = RefCell::new(Vec::new());
12}
13
14/// Timing-gathering tracing subscriber.
15///
16/// This type is constructed using a [`Builder`].
17///
18/// See the [crate-level docs] for details.
19///
20///   [crate-level docs]: ../
21pub struct TimingSubscriber<S = group::ByName, E = group::ByMessage>
22where
23    S: SpanGroup,
24    E: EventGroup,
25    S::Id: Hash + Eq,
26    E::Id: Hash + Eq,
27{
28    spans: ShardedLock<Slab<SpanGroupContext<S::Id>>>,
29    timing: Timing<S, E>,
30}
31
32impl<S, E> std::fmt::Debug for TimingSubscriber<S, E>
33where
34    S: SpanGroup + std::fmt::Debug,
35    S::Id: Hash + Eq + std::fmt::Debug,
36    E: EventGroup + std::fmt::Debug,
37    E::Id: Hash + Eq + std::fmt::Debug,
38{
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("TimingSubscriber")
41            .field("spans", &self.spans)
42            .field("timing", &self.timing)
43            .finish()
44    }
45}
46
47#[derive(Debug)]
48struct SpanGroupContext<S> {
49    parent: Option<span::Id>,
50    follows: Option<span::Id>,
51    meta: &'static Metadata<'static>,
52    state: SpanState<S>,
53
54    // how many references are there to each span id?
55    // needed so we know when to reclaim
56    refcount: atomic::AtomicUsize,
57}
58
59impl<S, E> TimingSubscriber<S, E>
60where
61    S: SpanGroup,
62    E: EventGroup,
63    S::Id: Hash + Eq,
64    E::Id: Hash + Eq,
65{
66    pub(crate) fn new(timing: Timing<S, E>) -> Self {
67        Self {
68            timing,
69            spans: Default::default(),
70        }
71    }
72
73    /// Force all current timing information to be refreshed immediately.
74    ///
75    /// Note that this will interrupt all concurrent metrics gathering until it returns.
76    pub fn force_synchronize(&self) {
77        self.timing.force_synchronize()
78    }
79
80    /// Access the timing histograms.
81    ///
82    /// Be aware that the contained histograms are not automatically updated to reflect recently
83    /// gathered samples. For each histogram you wish to read from, you must call `refresh` or
84    /// `refresh_timeout` or `force_synchronize` to gather up-to-date samples.
85    ///
86    /// For information about what you can do with the histograms, see the [`hdrhistogram`
87    /// documentation].
88    ///
89    ///   [`hdrhistogram` documentation]: https://docs.rs/hdrhistogram/
90    pub fn with_histograms<F, R>(&self, f: F) -> R
91    where
92        F: FnOnce(&mut HashMap<S::Id, IndexMap<E::Id, SyncHistogram<u64>, Hasher>>) -> R,
93    {
94        self.timing.with_histograms(f)
95    }
96}
97
98impl<S, E> Subscriber for TimingSubscriber<S, E>
99where
100    S: SpanGroup + 'static,
101    E: EventGroup + 'static,
102    S::Id: Clone + Hash + Eq + 'static,
103    E::Id: Clone + Hash + Eq + 'static,
104{
105    fn enabled(&self, _: &Metadata) -> bool {
106        // NOTE: we override this just because we have to. for filtering, use Layer
107        true
108    }
109
110    fn new_span(&self, span: &span::Attributes) -> span::Id {
111        let group = self.timing.span_group.group(span);
112        let parent = span
113            .parent()
114            .cloned()
115            .or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned()));
116
117        let sg = SpanGroupContext {
118            parent,
119            follows: None,
120            meta: span.metadata(),
121            refcount: atomic::AtomicUsize::new(1),
122            state: SpanState {
123                group: group.clone(),
124                last_event: atomic::AtomicU64::new(self.timing.time.raw()),
125            },
126        };
127
128        let id = {
129            let mut inner = self.spans.write().unwrap();
130            inner.insert(sg)
131        };
132
133        self.timing.ensure_group(group);
134        span::Id::from_u64(id as u64 + 1)
135    }
136
137    fn record(&self, _: &span::Id, _: &span::Record) {}
138
139    fn record_follows_from(&self, span: &span::Id, follows: &span::Id) {
140        let mut inner = self.spans.write().unwrap();
141        inner.get_mut(span_id_to_slab_idx(span)).unwrap().follows = Some(follows.clone());
142    }
143
144    fn event(&self, event: &Event) {
145        let span = event.parent().cloned().or_else(|| {
146            SPAN.with(|current_span| {
147                let current_span = current_span.borrow();
148                current_span.last().cloned()
149            })
150        });
151        if let Some(span) = span {
152            let inner = self.spans.read().unwrap();
153            let inner = &*inner;
154            self.timing.time(event, |on_each| {
155                let mut current = Some(span.clone());
156                while let Some(ref at) = current {
157                    let idx = span_id_to_slab_idx(&at);
158                    let span = &inner[idx];
159                    if !on_each(&span.state) {
160                        break;
161                    }
162                    current = span.parent.clone();
163                }
164            });
165        } else {
166            // recorded free-standing event -- ignoring
167        }
168    }
169
170    fn enter(&self, span: &span::Id) {
171        SPAN.with(|current_span| {
172            current_span.borrow_mut().push(span.clone());
173        })
174    }
175
176    fn exit(&self, span: &span::Id) {
177        // we are guaranteed that on any given thread, spans are exited in reverse order
178        SPAN.with(|current_span| {
179            let leaving = current_span
180                .borrow_mut()
181                .pop()
182                .expect("told to exit span when not in span");
183            assert_eq!(
184                &leaving, span,
185                "told to exit span that was not most recently entered"
186            );
187        })
188    }
189
190    fn clone_span(&self, span: &span::Id) -> span::Id {
191        let inner = self.spans.read().unwrap();
192        inner[span_id_to_slab_idx(span)]
193            .refcount
194            .fetch_add(1, atomic::Ordering::AcqRel);
195        span.clone()
196    }
197
198    fn try_close(&self, span: span::Id) -> bool {
199        macro_rules! unwinding_lock {
200            ($lock:expr) => {
201                match $lock {
202                    Ok(g) => g,
203                    Err(_) if std::thread::panicking() => {
204                        // we're trying to take the span lock while panicking
205                        // the lock is poisoned, so the writer state is corrupt
206                        // so we might as well just return -- nothing more we can do
207                        return false;
208                    }
209                    r @ Err(_) => r.unwrap(),
210                }
211            };
212        }
213
214        if 1 == unwinding_lock!(self.spans.read())[span_id_to_slab_idx(&span)]
215            .refcount
216            .fetch_sub(1, atomic::Ordering::AcqRel)
217        {
218            // span has ended!
219            if self.timing.span_close_events {
220                // record a span-end event
221                let inner = unwinding_lock!(self.spans.read());
222                if let Some(span_info) = inner.get(span_id_to_slab_idx(&span)) {
223                    let meta = span_info.meta;
224                    let fs = field::FieldSet::new(&["message"], meta.callsite());
225                    let fld = fs.iter().next().unwrap();
226                    let v = [(&fld, Some(&"close" as &dyn field::Value))];
227                    let vs = fs.value_set(&v);
228                    let e = Event::new_child_of(span.clone(), meta, &vs);
229                    self.event(&e);
230                }
231            }
232
233            // reclaim the span's id
234            let mut inner = unwinding_lock!(self.spans.write());
235            inner.remove(span_id_to_slab_idx(&span));
236            // we _keep_ the entry in inner.recorders in place, since it may be used by other spans
237            true
238        } else {
239            false
240        }
241    }
242
243    fn current_span(&self) -> span::Current {
244        SPAN.with(|current_span| {
245            current_span.borrow_mut().last().map(|sid| {
246                span::Current::new(
247                    sid.clone(),
248                    self.spans.read().unwrap()[span_id_to_slab_idx(sid)].meta,
249                )
250            })
251        })
252        .unwrap_or_else(span::Current::none)
253    }
254}
255
256/// A convenience type for getting access to [`TimingSubscriber`] through a `Dispatch`.
257///
258/// See [`TimingSubscriber::downcaster`].
259#[derive(Debug, Copy)]
260pub struct Downcaster<S, E> {
261    phantom: PhantomData<fn(S, E)>,
262}
263
264impl<S, E> Clone for Downcaster<S, E> {
265    fn clone(&self) -> Self {
266        Self {
267            phantom: PhantomData,
268        }
269    }
270}
271
272impl<S, E> TimingSubscriber<S, E>
273where
274    S: SpanGroup,
275    E: EventGroup,
276    S::Id: Clone + Hash + Eq,
277    E::Id: Clone + Hash + Eq,
278{
279    /// Returns an identifier that can later be used to get access to this [`TimingSubscriber`]
280    /// after it has been turned into a `tracing::Dispatch`.
281    ///
282    /// ```rust
283    /// use tracing::*;
284    /// use tracing_timing::{Builder, Histogram, TimingSubscriber};
285    /// let subscriber = Builder::default().build(|| Histogram::new_with_max(1_000_000, 2).unwrap());
286    /// let downcaster = subscriber.downcaster();
287    /// let dispatch = Dispatch::new(subscriber);
288    /// // ...
289    /// // code that hands off clones of the dispatch
290    /// // maybe to other threads
291    /// // ...
292    /// downcaster.downcast(&dispatch).unwrap().with_histograms(|hs| {
293    ///     for (span_group, hs) in hs {
294    ///         for (event_group, h) in hs {
295    ///             // make sure we see the latest samples:
296    ///             h.refresh();
297    ///             // print the median:
298    ///             println!("{} -> {}: {}ns", span_group, event_group, h.value_at_quantile(0.5))
299    ///         }
300    ///     }
301    /// });
302    /// ```
303    ///
304    pub fn downcaster(&self) -> Downcaster<S, E> {
305        Downcaster {
306            phantom: PhantomData,
307        }
308    }
309}
310
311impl<S, E> Downcaster<S, E>
312where
313    S: SpanGroup + 'static,
314    E: EventGroup + 'static,
315    S::Id: Clone + Hash + Eq + 'static,
316    E::Id: Clone + Hash + Eq + 'static,
317{
318    /// Retrieve a reference to this ident's original [`TimingSubscriber`].
319    ///
320    /// This method returns `None` if the given `Dispatch` is not holding a subscriber of the same
321    /// type as this ident was created from.
322    pub fn downcast<'a>(&self, d: &'a Dispatch) -> Option<&'a TimingSubscriber<S, E>> {
323        d.downcast_ref()
324    }
325}