Skip to main content

fast_telemetry/span/
collector.rs

1//! Thread-local span collector with zero-atomic submit path.
2//!
3//! Each thread buffers completed spans in a thread-local [`Vec`] — avoiding atomics,
4//! CAS loops, and hopefully cache-line contention.  When the buffer reaches
5//! [`FLUSH_THRESHOLD`] spans (or on thread exit), it is moved to a shared
6//! outbox that the exporter drains via [`SpanCollector::drain_into`].
7//!
8//! The outbox transfer uses a [`parking_lot::Mutex`] that is per-thread and
9//! therefore uncontended during normal operation (the exporter only touches it
10//! every few seconds).
11//!
12//! Each outbox is capped at [`OUTBOX_CAPACITY`] spans to bound memory.
13//! When the outbox is full, flushes are silently dropped.
14
15use std::borrow::Cow;
16use std::cell::UnsafeCell;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use super::context::SpanContext;
22use super::types::{CollectorRef, CompletedSpan, Span, SpanKind};
23use crate::metric::Counter;
24
25/// Number of spans buffered thread-locally before flushing to the shared outbox.
26/// Higher values amortize the mutex cost but increase latency to export.
27const FLUSH_THRESHOLD: usize = 64;
28
29/// Maximum number of spans held per outbox.  When a flush would exceed this
30/// limit the batch is silently dropped, bounding memory to
31/// `OUTBOX_CAPACITY × num_threads × sizeof(CompletedSpan)`.
32const OUTBOX_CAPACITY: usize = 4096;
33
34/// A shared outbox that a single thread flushes into and the exporter drains.
35struct Outbox {
36    spans: Mutex<Vec<CompletedSpan>>,
37}
38
39impl Outbox {
40    fn new() -> Self {
41        Self {
42            spans: Mutex::new(Vec::with_capacity(FLUSH_THRESHOLD * 2)),
43        }
44    }
45}
46
47/// Per-thread buffer for a single [`SpanCollector`].
48struct ThreadBuffer {
49    /// Thread-local span buffer.  `push()` is a plain Vec append — zero atomics.
50    buffer: Vec<CompletedSpan>,
51    /// Shared outbox registered with the collector.
52    outbox: Arc<Outbox>,
53    /// Adaptive sampling: log2 of the sampling denominator.
54    /// 0 = record every span, 5 = 1/32, 6 = 1/64, 7 = 1/128.
55    sample_shift: u32,
56    /// Monotonic counter for sampling decisions.
57    span_counter: u64,
58}
59
60impl ThreadBuffer {
61    fn new(outbox: Arc<Outbox>) -> Self {
62        Self {
63            buffer: Vec::with_capacity(FLUSH_THRESHOLD),
64            outbox,
65            sample_shift: 0,
66            span_counter: 0,
67        }
68    }
69
70    /// Returns `true` if the next span should be recorded, based on the
71    /// current adaptive sampling rate.  Pure thread-local arithmetic —
72    /// zero atomics.
73    #[inline]
74    fn should_record(&mut self) -> bool {
75        self.span_counter = self.span_counter.wrapping_add(1);
76        if self.sample_shift == 0 {
77            return true;
78        }
79        (self.span_counter & ((1u64 << self.sample_shift) - 1)) == 0
80    }
81
82    #[inline]
83    fn push(&mut self, span: CompletedSpan) {
84        self.buffer.push(span);
85        if self.buffer.len() >= FLUSH_THRESHOLD {
86            self.flush();
87        }
88    }
89
90    fn flush(&mut self) {
91        if !self.buffer.is_empty() {
92            let mut outbox = self.outbox.spans.lock();
93            let occupancy = outbox.len();
94            if occupancy < OUTBOX_CAPACITY {
95                outbox.append(&mut self.buffer);
96            } else {
97                self.buffer.clear();
98            }
99            // Adjust sampling rate based on outbox pressure.
100            self.sample_shift = if occupancy <= OUTBOX_CAPACITY / 4 {
101                0 // ≤25% full — record everything
102            } else if occupancy <= OUTBOX_CAPACITY / 2 {
103                5 // ≤50% — 1/32
104            } else if occupancy <= OUTBOX_CAPACITY * 3 / 4 {
105                6 // ≤75% — 1/64
106            } else {
107                7 // >75% — 1/128
108            };
109        }
110    }
111}
112
113impl Drop for ThreadBuffer {
114    fn drop(&mut self) {
115        self.flush();
116    }
117}
118
119/// Per-thread state: maps collector pointer → thread buffer.
120///
121/// Uses a raw pointer as key to avoid Arc overhead on the collector itself.
122/// This is safe because `submit()` is called through a `CollectorRef` that
123/// is guaranteed to outlive the span (see `CollectorRef` safety comments).
124struct ThreadLocalState {
125    /// Sorted by collector pointer for binary search.  In practice there is
126    /// one collector per process, so this is a single-element vec.
127    entries: Vec<(usize, ThreadBuffer)>,
128}
129
130impl ThreadLocalState {
131    fn new() -> Self {
132        Self {
133            entries: Vec::new(),
134        }
135    }
136
137    #[inline]
138    fn get_or_register(&mut self, collector: &SpanCollector) -> &mut ThreadBuffer {
139        let key = collector as *const SpanCollector as usize;
140        // Fast path: check if we already have an entry for this collector.
141        let pos = self.entries.iter().position(|(k, _)| *k == key);
142        if let Some(pos) = pos {
143            return &mut self.entries[pos].1;
144        }
145        self.register(collector, key)
146    }
147
148    #[cold]
149    fn register(&mut self, collector: &SpanCollector, key: usize) -> &mut ThreadBuffer {
150        // Register a new outbox with the collector.
151        let outbox = Arc::new(Outbox::new());
152        collector.outboxes.lock().push(Arc::clone(&outbox));
153        self.entries.push((key, ThreadBuffer::new(outbox)));
154        &mut self.entries.last_mut().expect("just pushed").1
155    }
156}
157
158impl Drop for ThreadLocalState {
159    fn drop(&mut self) {
160        // Flush all remaining spans on thread exit.
161        for (_, buffer) in &mut self.entries {
162            buffer.flush();
163        }
164    }
165}
166
167thread_local! {
168    /// Thread-local span state.
169    ///
170    /// Uses `UnsafeCell` rather than `RefCell` to skip the runtime borrow
171    /// check on the per-span hot path (~5 cycles saved per submit). Safety
172    /// is preserved by enforcing two invariants:
173    ///   1. All access is single-threaded (`thread_local!`).
174    ///   2. Every `&mut` borrow is fully consumed inside the closure passed
175    ///      to `LOCAL.with`. No closure recursively re-enters `with`, and
176    ///      none of the called methods (`get_or_register`, `push`,
177    ///      `should_record`) re-enter the cell.
178    static LOCAL: UnsafeCell<ThreadLocalState> = UnsafeCell::new(ThreadLocalState::new());
179}
180
181/// Thread-local span collector with zero-atomic submit path.
182///
183/// Completed spans are buffered in a thread-local [`Vec`] and periodically
184/// flushed to a shared outbox.  The exporter calls
185/// [`drain_into`](SpanCollector::drain_into) to harvest all pending spans.
186///
187/// Created explicitly and held as `Arc<SpanCollector>`.
188pub struct SpanCollector {
189    /// Registered per-thread outboxes.  Lock is taken only when:
190    /// (a) a new thread first submits a span (registration), or
191    /// (b) the exporter drains spans.
192    outboxes: Mutex<Vec<Arc<Outbox>>>,
193    /// Spans that were recorded (passed sampling).
194    spans_recorded: Counter,
195    /// Spans that were dropped by adaptive sampling.
196    spans_sampled_out: Counter,
197}
198
199impl SpanCollector {
200    /// Create a new collector.
201    ///
202    /// The `_num_shards` and `_capacity_per_shard` parameters are accepted for
203    /// API compatibility but are no longer used — each thread gets its own
204    /// buffer automatically, and buffers are unbounded.
205    pub fn new(_num_shards: usize, _capacity_per_shard: usize) -> Self {
206        Self {
207            outboxes: Mutex::new(Vec::new()),
208            spans_recorded: Counter::new(8),
209            spans_sampled_out: Counter::new(8),
210        }
211    }
212
213    /// Create a new root span with a fresh trace ID.
214    ///
215    /// The span is associated with this collector and will be submitted
216    /// here when it drops.  Under high load, adaptive sampling may return
217    /// a no-op span that skips all recording and submission.
218    pub fn start_span(
219        self: &Arc<Self>,
220        name: impl Into<Cow<'static, str>>,
221        kind: SpanKind,
222    ) -> Span {
223        let collector_ref = CollectorRef::from_arc(self);
224        if self.should_record() {
225            self.spans_recorded.inc();
226            Span::new_root(name, kind, collector_ref)
227        } else {
228            self.spans_sampled_out.inc();
229            Span::noop(collector_ref)
230        }
231    }
232
233    /// Create a root span from an incoming W3C `traceparent` header.
234    ///
235    /// If the header is valid, the span inherits the remote trace ID and sets
236    /// `parent_span_id` to the remote span ID.  If the header is `None` or
237    /// invalid, behaves like [`start_span`](Self::start_span) (new trace ID).
238    ///
239    /// Adaptive sampling applies: under load, may return a no-op span.
240    pub fn start_span_from_traceparent(
241        self: &Arc<Self>,
242        traceparent: Option<&str>,
243        name: impl Into<Cow<'static, str>>,
244        kind: SpanKind,
245    ) -> Span {
246        let collector_ref = CollectorRef::from_arc(self);
247        if !self.should_record() {
248            self.spans_sampled_out.inc();
249            return Span::noop(collector_ref);
250        }
251        self.spans_recorded.inc();
252        match traceparent.and_then(SpanContext::from_traceparent) {
253            Some(remote_ctx) => Span::new_from_remote(name, kind, remote_ctx, collector_ref),
254            None => Span::new_root(name, kind, collector_ref),
255        }
256    }
257
258    /// Check the thread-local adaptive sampling counter.
259    ///
260    /// Returns `true` if the next span should be recorded. Pure thread-local
261    /// arithmetic — zero atomics, zero contention.
262    #[inline]
263    fn should_record(&self) -> bool {
264        LOCAL.with(|cell| {
265            // SAFETY: see LOCAL definition. Single-thread access; the &mut
266            // borrow is fully consumed before this closure returns.
267            let state = unsafe { &mut *cell.get() };
268            state.get_or_register(self).should_record()
269        })
270    }
271
272    /// Submit a completed span.  Called by [`Span::drop`].
273    ///
274    /// Pushes to a thread-local `Vec` — zero atomics on the fast path.
275    /// Every [`FLUSH_THRESHOLD`] spans, the buffer is moved to the shared
276    /// outbox under a per-thread mutex (uncontended).
277    #[inline]
278    pub(crate) fn submit(&self, span: CompletedSpan) {
279        LOCAL.with(|cell| {
280            // SAFETY: see LOCAL definition. Single-thread access; the &mut
281            // borrow is fully consumed before this closure returns.
282            let state = unsafe { &mut *cell.get() };
283            state.get_or_register(self).push(span);
284        });
285    }
286
287    /// Flush the current thread's local buffer to the shared outbox.
288    ///
289    /// Call this before [`drain_into`](Self::drain_into) when running on the
290    /// same thread that submitted spans (e.g., in tests or single-threaded
291    /// exporters).  In production, thread-local buffers are flushed
292    /// automatically when they reach [`FLUSH_THRESHOLD`] or on thread exit.
293    pub fn flush_local(&self) {
294        LOCAL.with(|cell| {
295            // SAFETY: see LOCAL definition.
296            let state = unsafe { &mut *cell.get() };
297            let key = self as *const SpanCollector as usize;
298            if let Some(pos) = state.entries.iter().position(|(k, _)| *k == key) {
299                state.entries[pos].1.flush();
300            }
301        });
302    }
303
304    /// Drain all pending spans into the provided buffer.
305    ///
306    /// This is the primary method for exporters.  It collects spans from all
307    /// registered thread outboxes.  Spans still in thread-local buffers below
308    /// the flush threshold are NOT included unless [`flush_local`](Self::flush_local)
309    /// is called first (or the thread exits).
310    ///
311    /// The caller can reuse the buffer across export cycles to avoid repeated
312    /// allocation.
313    pub fn drain_into(&self, buf: &mut Vec<CompletedSpan>) {
314        let outboxes = self.outboxes.lock();
315        for outbox in outboxes.iter() {
316            let mut spans = outbox.spans.lock();
317            buf.append(&mut spans);
318            // Release excess capacity so drained outboxes don't hold onto
319            // large allocations between export cycles.
320            spans.shrink_to(FLUSH_THRESHOLD * 2);
321        }
322    }
323
324    /// Number of spans that were dropped.
325    ///
326    /// Always returns 0.  Retained for API compatibility; use
327    /// [`sampled_out_count`](Self::sampled_out_count) for adaptive sampling stats.
328    pub fn dropped_count(&self) -> u64 {
329        0
330    }
331
332    /// Total spans that passed adaptive sampling and were recorded.
333    pub fn recorded_count(&self) -> u64 {
334        self.spans_recorded.sum() as u64
335    }
336
337    /// Total spans that were dropped by adaptive sampling.
338    pub fn sampled_out_count(&self) -> u64 {
339        self.spans_sampled_out.sum() as u64
340    }
341
342    /// Current number of spans waiting across all outboxes.
343    ///
344    /// Does not include spans still in thread-local buffers that haven't
345    /// been flushed yet.
346    pub fn len(&self) -> usize {
347        let outboxes = self.outboxes.lock();
348        outboxes.iter().map(|o| o.spans.lock().len()).sum()
349    }
350
351    /// Returns `true` if all outboxes are empty.
352    ///
353    /// Does not account for spans in thread-local buffers below the flush
354    /// threshold.
355    pub fn is_empty(&self) -> bool {
356        let outboxes = self.outboxes.lock();
357        outboxes.iter().all(|o| o.spans.lock().is_empty())
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    /// Helper: flush + drain for same-thread tests.
366    fn flush_and_drain(collector: &SpanCollector, buf: &mut Vec<CompletedSpan>) {
367        collector.flush_local();
368        collector.drain_into(buf);
369    }
370
371    #[test]
372    fn start_and_drain() {
373        let collector = Arc::new(SpanCollector::new(1, 16));
374        {
375            let _span = collector.start_span("op1", SpanKind::Server);
376            let _span2 = collector.start_span("op2", SpanKind::Client);
377        }
378        let mut buf = Vec::new();
379        flush_and_drain(&collector, &mut buf);
380        assert_eq!(buf.len(), 2);
381    }
382
383    #[test]
384    fn small_batches_no_drops() {
385        let collector = Arc::new(SpanCollector::new(1, 2));
386        // Small batches below OUTBOX_CAPACITY are fully collected.
387        {
388            let _s1 = collector.start_span("a", SpanKind::Internal);
389            let _s2 = collector.start_span("b", SpanKind::Internal);
390            let _s3 = collector.start_span("c", SpanKind::Internal);
391        }
392        let mut buf = Vec::new();
393        flush_and_drain(&collector, &mut buf);
394        assert_eq!(buf.len(), 3);
395    }
396
397    #[test]
398    fn from_traceparent_valid() {
399        let collector = Arc::new(SpanCollector::new(1, 16));
400        let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
401        {
402            let _span =
403                collector.start_span_from_traceparent(Some(tp), "handler", SpanKind::Server);
404        }
405        let mut buf = Vec::new();
406        flush_and_drain(&collector, &mut buf);
407        assert_eq!(buf.len(), 1);
408        assert_eq!(
409            buf[0].trace_id.to_string(),
410            "4bf92f3577b34da6a3ce929d0e0e4736"
411        );
412    }
413
414    #[test]
415    fn from_traceparent_invalid_falls_back() {
416        let collector = Arc::new(SpanCollector::new(1, 16));
417        {
418            let _span =
419                collector.start_span_from_traceparent(Some("garbage"), "handler", SpanKind::Server);
420        }
421        let mut buf = Vec::new();
422        flush_and_drain(&collector, &mut buf);
423        assert_eq!(buf.len(), 1);
424        assert!(!buf[0].trace_id.is_invalid());
425        assert!(buf[0].parent_span_id.is_invalid());
426    }
427
428    #[test]
429    fn from_traceparent_none_creates_root() {
430        let collector = Arc::new(SpanCollector::new(1, 16));
431        {
432            let _span = collector.start_span_from_traceparent(None, "handler", SpanKind::Server);
433        }
434        let mut buf = Vec::new();
435        flush_and_drain(&collector, &mut buf);
436        assert_eq!(buf.len(), 1);
437        assert!(buf[0].parent_span_id.is_invalid());
438    }
439
440    #[test]
441    fn concurrent_submission() {
442        let collector = Arc::new(SpanCollector::new(8, 1024));
443        let mut handles = Vec::new();
444
445        for t in 0..4 {
446            let c = Arc::clone(&collector);
447            handles.push(std::thread::spawn(move || {
448                for i in 0..100 {
449                    let _span =
450                        c.start_span(format!("thread_{}_span_{}", t, i), SpanKind::Internal);
451                }
452            }));
453        }
454
455        for h in handles {
456            h.join().expect("thread join");
457        }
458
459        // Thread-local Drop flushes on thread exit, so drain_into is sufficient.
460        let mut buf = Vec::new();
461        collector.drain_into(&mut buf);
462        assert_eq!(buf.len(), 400);
463        assert_eq!(collector.dropped_count(), 0);
464    }
465
466    #[test]
467    fn flush_threshold_batching() {
468        let collector = Arc::new(SpanCollector::new(1, 64));
469        // Submit fewer spans than FLUSH_THRESHOLD — they should stay in
470        // thread-local buffer until flushed or the threshold is reached.
471        for _ in 0..FLUSH_THRESHOLD - 1 {
472            let _span = collector.start_span("sub_threshold", SpanKind::Internal);
473        }
474        // Outbox should be empty (all in thread-local buffer).
475        assert_eq!(collector.len(), 0);
476
477        // Submit one more to cross the threshold.
478        {
479            let _span = collector.start_span("trigger", SpanKind::Internal);
480        }
481        // Now the outbox should have FLUSH_THRESHOLD spans.
482        assert_eq!(collector.len(), FLUSH_THRESHOLD);
483    }
484
485    #[test]
486    fn flush_local_forces_transfer() {
487        let collector = Arc::new(SpanCollector::new(1, 64));
488        // Submit fewer than threshold.
489        for _ in 0..5 {
490            let _span = collector.start_span("local", SpanKind::Internal);
491        }
492        assert_eq!(collector.len(), 0);
493        collector.flush_local();
494        assert_eq!(collector.len(), 5);
495    }
496
497    #[test]
498    fn thread_exit_flushes() {
499        let collector = Arc::new(SpanCollector::new(1, 64));
500        let c = Arc::clone(&collector);
501        let handle = std::thread::spawn(move || {
502            // Submit fewer than FLUSH_THRESHOLD.
503            for _ in 0..10 {
504                let _span = c.start_span("thread_exit", SpanKind::Internal);
505            }
506            // Thread-local Drop should flush on thread exit.
507        });
508        handle.join().expect("thread join");
509
510        let mut buf = Vec::new();
511        collector.drain_into(&mut buf);
512        assert_eq!(buf.len(), 10);
513    }
514}