Skip to main content

fast_telemetry/span/
collector.rs

1//! Thread-local span collector with zero-atomic submit path.
2//!
3//! Each thread buffers completed spans in a thread-local [`Vec`] — avoiding atomics,
4//! CAS loops, and hopefully cache-line contention.  When the buffer reaches
5//! [`FLUSH_THRESHOLD`] spans (or on thread exit), it is moved to a shared
6//! outbox that the exporter drains via [`SpanCollector::drain_into`].
7//!
8//! The outbox transfer uses a [`parking_lot::Mutex`] that is per-thread and
9//! therefore uncontended during normal operation (the exporter only touches it
10//! every few seconds).
11//!
12//! Each outbox is capped at [`OUTBOX_CAPACITY`] spans to bound memory.
13//! When the outbox is full, flushes are silently dropped.
14
15use std::borrow::Cow;
16use std::cell::RefCell;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use super::context::SpanContext;
22use super::types::{CollectorRef, CompletedSpan, Span, SpanKind};
23use crate::metric::Counter;
24
25/// Number of spans buffered thread-locally before flushing to the shared outbox.
26/// Higher values amortize the mutex cost but increase latency to export.
27const FLUSH_THRESHOLD: usize = 64;
28
29/// Maximum number of spans held per outbox.  When a flush would exceed this
30/// limit the batch is silently dropped, bounding memory to
31/// `OUTBOX_CAPACITY × num_threads × sizeof(CompletedSpan)`.
32const OUTBOX_CAPACITY: usize = 4096;
33
34/// A shared outbox that a single thread flushes into and the exporter drains.
35struct Outbox {
36    spans: Mutex<Vec<CompletedSpan>>,
37}
38
39impl Outbox {
40    fn new() -> Self {
41        Self {
42            spans: Mutex::new(Vec::with_capacity(FLUSH_THRESHOLD * 2)),
43        }
44    }
45}
46
47/// Per-thread buffer for a single [`SpanCollector`].
48struct ThreadBuffer {
49    /// Thread-local span buffer.  `push()` is a plain Vec append — zero atomics.
50    buffer: Vec<CompletedSpan>,
51    /// Shared outbox registered with the collector.
52    outbox: Arc<Outbox>,
53    /// Adaptive sampling: log2 of the sampling denominator.
54    /// 0 = record every span, 5 = 1/32, 6 = 1/64, 7 = 1/128.
55    sample_shift: u32,
56    /// Monotonic counter for sampling decisions.
57    span_counter: u64,
58}
59
60impl ThreadBuffer {
61    fn new(outbox: Arc<Outbox>) -> Self {
62        Self {
63            buffer: Vec::with_capacity(FLUSH_THRESHOLD),
64            outbox,
65            sample_shift: 0,
66            span_counter: 0,
67        }
68    }
69
70    /// Returns `true` if the next span should be recorded, based on the
71    /// current adaptive sampling rate.  Pure thread-local arithmetic —
72    /// zero atomics.
73    #[inline]
74    fn should_record(&mut self) -> bool {
75        self.span_counter = self.span_counter.wrapping_add(1);
76        if self.sample_shift == 0 {
77            return true;
78        }
79        (self.span_counter & ((1u64 << self.sample_shift) - 1)) == 0
80    }
81
82    #[inline]
83    fn push(&mut self, span: CompletedSpan) {
84        self.buffer.push(span);
85        if self.buffer.len() >= FLUSH_THRESHOLD {
86            self.flush();
87        }
88    }
89
90    fn flush(&mut self) {
91        if !self.buffer.is_empty() {
92            let mut outbox = self.outbox.spans.lock();
93            let occupancy = outbox.len();
94            if occupancy < OUTBOX_CAPACITY {
95                outbox.append(&mut self.buffer);
96            } else {
97                self.buffer.clear();
98            }
99            // Adjust sampling rate based on outbox pressure.
100            self.sample_shift = if occupancy <= OUTBOX_CAPACITY / 4 {
101                0 // ≤25% full — record everything
102            } else if occupancy <= OUTBOX_CAPACITY / 2 {
103                5 // ≤50% — 1/32
104            } else if occupancy <= OUTBOX_CAPACITY * 3 / 4 {
105                6 // ≤75% — 1/64
106            } else {
107                7 // >75% — 1/128
108            };
109        }
110    }
111}
112
113impl Drop for ThreadBuffer {
114    fn drop(&mut self) {
115        self.flush();
116    }
117}
118
119/// Per-thread state: maps collector pointer → thread buffer.
120///
121/// Uses a raw pointer as key to avoid Arc overhead on the collector itself.
122/// This is safe because `submit()` is called through a `CollectorRef` that
123/// is guaranteed to outlive the span (see `CollectorRef` safety comments).
124struct ThreadLocalState {
125    /// Sorted by collector pointer for binary search.  In practice there is
126    /// one collector per process, so this is a single-element vec.
127    entries: Vec<(usize, ThreadBuffer)>,
128}
129
130impl ThreadLocalState {
131    fn new() -> Self {
132        Self {
133            entries: Vec::new(),
134        }
135    }
136
137    #[inline]
138    fn get_or_register(&mut self, collector: &SpanCollector) -> &mut ThreadBuffer {
139        let key = collector as *const SpanCollector as usize;
140        // Fast path: check if we already have an entry for this collector.
141        let pos = self.entries.iter().position(|(k, _)| *k == key);
142        if let Some(pos) = pos {
143            return &mut self.entries[pos].1;
144        }
145        self.register(collector, key)
146    }
147
148    #[cold]
149    fn register(&mut self, collector: &SpanCollector, key: usize) -> &mut ThreadBuffer {
150        // Register a new outbox with the collector.
151        let outbox = Arc::new(Outbox::new());
152        collector.outboxes.lock().push(Arc::clone(&outbox));
153        self.entries.push((key, ThreadBuffer::new(outbox)));
154        &mut self.entries.last_mut().expect("just pushed").1
155    }
156}
157
158impl Drop for ThreadLocalState {
159    fn drop(&mut self) {
160        // Flush all remaining spans on thread exit.
161        for (_, buffer) in &mut self.entries {
162            buffer.flush();
163        }
164    }
165}
166
167thread_local! {
168    static LOCAL: RefCell<ThreadLocalState> = RefCell::new(ThreadLocalState::new());
169}
170
171/// Thread-local span collector with zero-atomic submit path.
172///
173/// Completed spans are buffered in a thread-local [`Vec`] and periodically
174/// flushed to a shared outbox.  The exporter calls
175/// [`drain_into`](SpanCollector::drain_into) to harvest all pending spans.
176///
177/// Created explicitly and held as `Arc<SpanCollector>`.
178pub struct SpanCollector {
179    /// Registered per-thread outboxes.  Lock is taken only when:
180    /// (a) a new thread first submits a span (registration), or
181    /// (b) the exporter drains spans.
182    outboxes: Mutex<Vec<Arc<Outbox>>>,
183    /// Spans that were recorded (passed sampling).
184    spans_recorded: Counter,
185    /// Spans that were dropped by adaptive sampling.
186    spans_sampled_out: Counter,
187}
188
189impl SpanCollector {
190    /// Create a new collector.
191    ///
192    /// The `_num_shards` and `_capacity_per_shard` parameters are accepted for
193    /// API compatibility but are no longer used — each thread gets its own
194    /// buffer automatically, and buffers are unbounded.
195    pub fn new(_num_shards: usize, _capacity_per_shard: usize) -> Self {
196        Self {
197            outboxes: Mutex::new(Vec::new()),
198            spans_recorded: Counter::new(8),
199            spans_sampled_out: Counter::new(8),
200        }
201    }
202
203    /// Create a new root span with a fresh trace ID.
204    ///
205    /// The span is associated with this collector and will be submitted
206    /// here when it drops.  Under high load, adaptive sampling may return
207    /// a no-op span that skips all recording and submission.
208    pub fn start_span(
209        self: &Arc<Self>,
210        name: impl Into<Cow<'static, str>>,
211        kind: SpanKind,
212    ) -> Span {
213        let collector_ref = CollectorRef::from_arc(self);
214        if self.should_record() {
215            self.spans_recorded.inc();
216            Span::new_root(name, kind, collector_ref)
217        } else {
218            self.spans_sampled_out.inc();
219            Span::noop(collector_ref)
220        }
221    }
222
223    /// Create a root span from an incoming W3C `traceparent` header.
224    ///
225    /// If the header is valid, the span inherits the remote trace ID and sets
226    /// `parent_span_id` to the remote span ID.  If the header is `None` or
227    /// invalid, behaves like [`start_span`](Self::start_span) (new trace ID).
228    ///
229    /// Adaptive sampling applies: under load, may return a no-op span.
230    pub fn start_span_from_traceparent(
231        self: &Arc<Self>,
232        traceparent: Option<&str>,
233        name: impl Into<Cow<'static, str>>,
234        kind: SpanKind,
235    ) -> Span {
236        let collector_ref = CollectorRef::from_arc(self);
237        if !self.should_record() {
238            self.spans_sampled_out.inc();
239            return Span::noop(collector_ref);
240        }
241        self.spans_recorded.inc();
242        match traceparent.and_then(SpanContext::from_traceparent) {
243            Some(remote_ctx) => Span::new_from_remote(name, kind, remote_ctx, collector_ref),
244            None => Span::new_root(name, kind, collector_ref),
245        }
246    }
247
248    /// Check the thread-local adaptive sampling counter.
249    ///
250    /// Returns `true` if the next span should be recorded. Pure thread-local
251    /// arithmetic — zero atomics, zero contention.
252    #[inline]
253    fn should_record(&self) -> bool {
254        LOCAL.with(|cell| cell.borrow_mut().get_or_register(self).should_record())
255    }
256
257    /// Submit a completed span.  Called by [`Span::drop`].
258    ///
259    /// Pushes to a thread-local `Vec` — zero atomics on the fast path.
260    /// Every [`FLUSH_THRESHOLD`] spans, the buffer is moved to the shared
261    /// outbox under a per-thread mutex (uncontended).
262    #[inline]
263    pub(crate) fn submit(&self, span: CompletedSpan) {
264        LOCAL.with(|cell| {
265            cell.borrow_mut().get_or_register(self).push(span);
266        });
267    }
268
269    /// Flush the current thread's local buffer to the shared outbox.
270    ///
271    /// Call this before [`drain_into`](Self::drain_into) when running on the
272    /// same thread that submitted spans (e.g., in tests or single-threaded
273    /// exporters).  In production, thread-local buffers are flushed
274    /// automatically when they reach [`FLUSH_THRESHOLD`] or on thread exit.
275    pub fn flush_local(&self) {
276        LOCAL.with(|cell| {
277            let mut state = cell.borrow_mut();
278            let key = self as *const SpanCollector as usize;
279            if let Some(pos) = state.entries.iter().position(|(k, _)| *k == key) {
280                state.entries[pos].1.flush();
281            }
282        });
283    }
284
285    /// Drain all pending spans into the provided buffer.
286    ///
287    /// This is the primary method for exporters.  It collects spans from all
288    /// registered thread outboxes.  Spans still in thread-local buffers below
289    /// the flush threshold are NOT included unless [`flush_local`](Self::flush_local)
290    /// is called first (or the thread exits).
291    ///
292    /// The caller can reuse the buffer across export cycles to avoid repeated
293    /// allocation.
294    pub fn drain_into(&self, buf: &mut Vec<CompletedSpan>) {
295        let outboxes = self.outboxes.lock();
296        for outbox in outboxes.iter() {
297            let mut spans = outbox.spans.lock();
298            buf.append(&mut spans);
299            // Release excess capacity so drained outboxes don't hold onto
300            // large allocations between export cycles.
301            spans.shrink_to(FLUSH_THRESHOLD * 2);
302        }
303    }
304
305    /// Number of spans that were dropped.
306    ///
307    /// Always returns 0.  Retained for API compatibility; use
308    /// [`sampled_out_count`](Self::sampled_out_count) for adaptive sampling stats.
309    pub fn dropped_count(&self) -> u64 {
310        0
311    }
312
313    /// Total spans that passed adaptive sampling and were recorded.
314    pub fn recorded_count(&self) -> u64 {
315        self.spans_recorded.sum() as u64
316    }
317
318    /// Total spans that were dropped by adaptive sampling.
319    pub fn sampled_out_count(&self) -> u64 {
320        self.spans_sampled_out.sum() as u64
321    }
322
323    /// Current number of spans waiting across all outboxes.
324    ///
325    /// Does not include spans still in thread-local buffers that haven't
326    /// been flushed yet.
327    pub fn len(&self) -> usize {
328        let outboxes = self.outboxes.lock();
329        outboxes.iter().map(|o| o.spans.lock().len()).sum()
330    }
331
332    /// Returns `true` if all outboxes are empty.
333    ///
334    /// Does not account for spans in thread-local buffers below the flush
335    /// threshold.
336    pub fn is_empty(&self) -> bool {
337        let outboxes = self.outboxes.lock();
338        outboxes.iter().all(|o| o.spans.lock().is_empty())
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    /// Helper: flush + drain for same-thread tests.
347    fn flush_and_drain(collector: &SpanCollector, buf: &mut Vec<CompletedSpan>) {
348        collector.flush_local();
349        collector.drain_into(buf);
350    }
351
352    #[test]
353    fn start_and_drain() {
354        let collector = Arc::new(SpanCollector::new(1, 16));
355        {
356            let _span = collector.start_span("op1", SpanKind::Server);
357            let _span2 = collector.start_span("op2", SpanKind::Client);
358        }
359        let mut buf = Vec::new();
360        flush_and_drain(&collector, &mut buf);
361        assert_eq!(buf.len(), 2);
362    }
363
364    #[test]
365    fn small_batches_no_drops() {
366        let collector = Arc::new(SpanCollector::new(1, 2));
367        // Small batches below OUTBOX_CAPACITY are fully collected.
368        {
369            let _s1 = collector.start_span("a", SpanKind::Internal);
370            let _s2 = collector.start_span("b", SpanKind::Internal);
371            let _s3 = collector.start_span("c", SpanKind::Internal);
372        }
373        let mut buf = Vec::new();
374        flush_and_drain(&collector, &mut buf);
375        assert_eq!(buf.len(), 3);
376    }
377
378    #[test]
379    fn from_traceparent_valid() {
380        let collector = Arc::new(SpanCollector::new(1, 16));
381        let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
382        {
383            let _span =
384                collector.start_span_from_traceparent(Some(tp), "handler", SpanKind::Server);
385        }
386        let mut buf = Vec::new();
387        flush_and_drain(&collector, &mut buf);
388        assert_eq!(buf.len(), 1);
389        assert_eq!(
390            buf[0].trace_id.to_string(),
391            "4bf92f3577b34da6a3ce929d0e0e4736"
392        );
393    }
394
395    #[test]
396    fn from_traceparent_invalid_falls_back() {
397        let collector = Arc::new(SpanCollector::new(1, 16));
398        {
399            let _span =
400                collector.start_span_from_traceparent(Some("garbage"), "handler", SpanKind::Server);
401        }
402        let mut buf = Vec::new();
403        flush_and_drain(&collector, &mut buf);
404        assert_eq!(buf.len(), 1);
405        assert!(!buf[0].trace_id.is_invalid());
406        assert!(buf[0].parent_span_id.is_invalid());
407    }
408
409    #[test]
410    fn from_traceparent_none_creates_root() {
411        let collector = Arc::new(SpanCollector::new(1, 16));
412        {
413            let _span = collector.start_span_from_traceparent(None, "handler", SpanKind::Server);
414        }
415        let mut buf = Vec::new();
416        flush_and_drain(&collector, &mut buf);
417        assert_eq!(buf.len(), 1);
418        assert!(buf[0].parent_span_id.is_invalid());
419    }
420
421    #[test]
422    fn concurrent_submission() {
423        let collector = Arc::new(SpanCollector::new(8, 1024));
424        let mut handles = Vec::new();
425
426        for t in 0..4 {
427            let c = Arc::clone(&collector);
428            handles.push(std::thread::spawn(move || {
429                for i in 0..100 {
430                    let _span =
431                        c.start_span(format!("thread_{}_span_{}", t, i), SpanKind::Internal);
432                }
433            }));
434        }
435
436        for h in handles {
437            h.join().expect("thread join");
438        }
439
440        // Thread-local Drop flushes on thread exit, so drain_into is sufficient.
441        let mut buf = Vec::new();
442        collector.drain_into(&mut buf);
443        assert_eq!(buf.len(), 400);
444        assert_eq!(collector.dropped_count(), 0);
445    }
446
447    #[test]
448    fn flush_threshold_batching() {
449        let collector = Arc::new(SpanCollector::new(1, 64));
450        // Submit fewer spans than FLUSH_THRESHOLD — they should stay in
451        // thread-local buffer until flushed or the threshold is reached.
452        for _ in 0..FLUSH_THRESHOLD - 1 {
453            let _span = collector.start_span("sub_threshold", SpanKind::Internal);
454        }
455        // Outbox should be empty (all in thread-local buffer).
456        assert_eq!(collector.len(), 0);
457
458        // Submit one more to cross the threshold.
459        {
460            let _span = collector.start_span("trigger", SpanKind::Internal);
461        }
462        // Now the outbox should have FLUSH_THRESHOLD spans.
463        assert_eq!(collector.len(), FLUSH_THRESHOLD);
464    }
465
466    #[test]
467    fn flush_local_forces_transfer() {
468        let collector = Arc::new(SpanCollector::new(1, 64));
469        // Submit fewer than threshold.
470        for _ in 0..5 {
471            let _span = collector.start_span("local", SpanKind::Internal);
472        }
473        assert_eq!(collector.len(), 0);
474        collector.flush_local();
475        assert_eq!(collector.len(), 5);
476    }
477
478    #[test]
479    fn thread_exit_flushes() {
480        let collector = Arc::new(SpanCollector::new(1, 64));
481        let c = Arc::clone(&collector);
482        let handle = std::thread::spawn(move || {
483            // Submit fewer than FLUSH_THRESHOLD.
484            for _ in 0..10 {
485                let _span = c.start_span("thread_exit", SpanKind::Internal);
486            }
487            // Thread-local Drop should flush on thread exit.
488        });
489        handle.join().expect("thread join");
490
491        let mut buf = Vec::new();
492        collector.drain_into(&mut buf);
493        assert_eq!(buf.len(), 10);
494    }
495}