1use super::*;
2use crossbeam::sync::ShardedLock;
3use hdrhistogram::SyncHistogram;
4use indexmap::IndexMap;
5use slab::Slab;
6use std::cell::RefCell;
7use std::hash::Hash;
8use std::sync::atomic;
9
10thread_local! {
11 static SPAN: RefCell<Vec<span::Id>> = RefCell::new(Vec::new());
12}
13
14pub struct TimingSubscriber<S = group::ByName, E = group::ByMessage>
22where
23 S: SpanGroup,
24 E: EventGroup,
25 S::Id: Hash + Eq,
26 E::Id: Hash + Eq,
27{
28 spans: ShardedLock<Slab<SpanGroupContext<S::Id>>>,
29 timing: Timing<S, E>,
30}
31
32impl<S, E> std::fmt::Debug for TimingSubscriber<S, E>
33where
34 S: SpanGroup + std::fmt::Debug,
35 S::Id: Hash + Eq + std::fmt::Debug,
36 E: EventGroup + std::fmt::Debug,
37 E::Id: Hash + Eq + std::fmt::Debug,
38{
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("TimingSubscriber")
41 .field("spans", &self.spans)
42 .field("timing", &self.timing)
43 .finish()
44 }
45}
46
47#[derive(Debug)]
48struct SpanGroupContext<S> {
49 parent: Option<span::Id>,
50 follows: Option<span::Id>,
51 meta: &'static Metadata<'static>,
52 state: SpanState<S>,
53
54 refcount: atomic::AtomicUsize,
57}
58
59impl<S, E> TimingSubscriber<S, E>
60where
61 S: SpanGroup,
62 E: EventGroup,
63 S::Id: Hash + Eq,
64 E::Id: Hash + Eq,
65{
66 pub(crate) fn new(timing: Timing<S, E>) -> Self {
67 Self {
68 timing,
69 spans: Default::default(),
70 }
71 }
72
73 pub fn force_synchronize(&self) {
77 self.timing.force_synchronize()
78 }
79
80 pub fn with_histograms<F, R>(&self, f: F) -> R
91 where
92 F: FnOnce(&mut HashMap<S::Id, IndexMap<E::Id, SyncHistogram<u64>, Hasher>>) -> R,
93 {
94 self.timing.with_histograms(f)
95 }
96}
97
98impl<S, E> Subscriber for TimingSubscriber<S, E>
99where
100 S: SpanGroup + 'static,
101 E: EventGroup + 'static,
102 S::Id: Clone + Hash + Eq + 'static,
103 E::Id: Clone + Hash + Eq + 'static,
104{
105 fn enabled(&self, _: &Metadata) -> bool {
106 true
108 }
109
110 fn new_span(&self, span: &span::Attributes) -> span::Id {
111 let group = self.timing.span_group.group(span);
112 let parent = span
113 .parent()
114 .cloned()
115 .or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned()));
116
117 let sg = SpanGroupContext {
118 parent,
119 follows: None,
120 meta: span.metadata(),
121 refcount: atomic::AtomicUsize::new(1),
122 state: SpanState {
123 group: group.clone(),
124 last_event: atomic::AtomicU64::new(self.timing.time.raw()),
125 },
126 };
127
128 let id = {
129 let mut inner = self.spans.write().unwrap();
130 inner.insert(sg)
131 };
132
133 self.timing.ensure_group(group);
134 span::Id::from_u64(id as u64 + 1)
135 }
136
137 fn record(&self, _: &span::Id, _: &span::Record) {}
138
139 fn record_follows_from(&self, span: &span::Id, follows: &span::Id) {
140 let mut inner = self.spans.write().unwrap();
141 inner.get_mut(span_id_to_slab_idx(span)).unwrap().follows = Some(follows.clone());
142 }
143
144 fn event(&self, event: &Event) {
145 let span = event.parent().cloned().or_else(|| {
146 SPAN.with(|current_span| {
147 let current_span = current_span.borrow();
148 current_span.last().cloned()
149 })
150 });
151 if let Some(span) = span {
152 let inner = self.spans.read().unwrap();
153 let inner = &*inner;
154 self.timing.time(event, |on_each| {
155 let mut current = Some(span.clone());
156 while let Some(ref at) = current {
157 let idx = span_id_to_slab_idx(&at);
158 let span = &inner[idx];
159 if !on_each(&span.state) {
160 break;
161 }
162 current = span.parent.clone();
163 }
164 });
165 } else {
166 }
168 }
169
170 fn enter(&self, span: &span::Id) {
171 SPAN.with(|current_span| {
172 current_span.borrow_mut().push(span.clone());
173 })
174 }
175
176 fn exit(&self, span: &span::Id) {
177 SPAN.with(|current_span| {
179 let leaving = current_span
180 .borrow_mut()
181 .pop()
182 .expect("told to exit span when not in span");
183 assert_eq!(
184 &leaving, span,
185 "told to exit span that was not most recently entered"
186 );
187 })
188 }
189
190 fn clone_span(&self, span: &span::Id) -> span::Id {
191 let inner = self.spans.read().unwrap();
192 inner[span_id_to_slab_idx(span)]
193 .refcount
194 .fetch_add(1, atomic::Ordering::AcqRel);
195 span.clone()
196 }
197
198 fn try_close(&self, span: span::Id) -> bool {
199 macro_rules! unwinding_lock {
200 ($lock:expr) => {
201 match $lock {
202 Ok(g) => g,
203 Err(_) if std::thread::panicking() => {
204 return false;
208 }
209 r @ Err(_) => r.unwrap(),
210 }
211 };
212 }
213
214 if 1 == unwinding_lock!(self.spans.read())[span_id_to_slab_idx(&span)]
215 .refcount
216 .fetch_sub(1, atomic::Ordering::AcqRel)
217 {
218 if self.timing.span_close_events {
220 let inner = unwinding_lock!(self.spans.read());
222 if let Some(span_info) = inner.get(span_id_to_slab_idx(&span)) {
223 let meta = span_info.meta;
224 let fs = field::FieldSet::new(&["message"], meta.callsite());
225 let fld = fs.iter().next().unwrap();
226 let v = [(&fld, Some(&"close" as &dyn field::Value))];
227 let vs = fs.value_set(&v);
228 let e = Event::new_child_of(span.clone(), meta, &vs);
229 self.event(&e);
230 }
231 }
232
233 let mut inner = unwinding_lock!(self.spans.write());
235 inner.remove(span_id_to_slab_idx(&span));
236 true
238 } else {
239 false
240 }
241 }
242
243 fn current_span(&self) -> span::Current {
244 SPAN.with(|current_span| {
245 current_span.borrow_mut().last().map(|sid| {
246 span::Current::new(
247 sid.clone(),
248 self.spans.read().unwrap()[span_id_to_slab_idx(sid)].meta,
249 )
250 })
251 })
252 .unwrap_or_else(span::Current::none)
253 }
254}
255
256#[derive(Debug, Copy)]
260pub struct Downcaster<S, E> {
261 phantom: PhantomData<fn(S, E)>,
262}
263
264impl<S, E> Clone for Downcaster<S, E> {
265 fn clone(&self) -> Self {
266 Self {
267 phantom: PhantomData,
268 }
269 }
270}
271
272impl<S, E> TimingSubscriber<S, E>
273where
274 S: SpanGroup,
275 E: EventGroup,
276 S::Id: Clone + Hash + Eq,
277 E::Id: Clone + Hash + Eq,
278{
279 pub fn downcaster(&self) -> Downcaster<S, E> {
305 Downcaster {
306 phantom: PhantomData,
307 }
308 }
309}
310
311impl<S, E> Downcaster<S, E>
312where
313 S: SpanGroup + 'static,
314 E: EventGroup + 'static,
315 S::Id: Clone + Hash + Eq + 'static,
316 E::Id: Clone + Hash + Eq + 'static,
317{
318 pub fn downcast<'a>(&self, d: &'a Dispatch) -> Option<&'a TimingSubscriber<S, E>> {
323 d.downcast_ref()
324 }
325}