fast_telemetry/span/collector.rs
1//! Thread-local span collector with zero-atomic submit path.
2//!
3//! Each thread buffers completed spans in a thread-local [`Vec`] — avoiding atomics,
4//! CAS loops, and hopefully cache-line contention. When the buffer reaches
5//! [`FLUSH_THRESHOLD`] spans (or on thread exit), it is moved to a shared
6//! outbox that the exporter drains via [`SpanCollector::drain_into`].
7//!
8//! The outbox transfer uses a [`parking_lot::Mutex`] that is per-thread and
9//! therefore uncontended during normal operation (the exporter only touches it
10//! every few seconds).
11//!
12//! Each outbox is capped at [`OUTBOX_CAPACITY`] spans to bound memory.
13//! When the outbox is full, flushes are silently dropped.
14
15use std::borrow::Cow;
16use std::cell::UnsafeCell;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use super::context::SpanContext;
22use super::types::{CollectorRef, CompletedSpan, Span, SpanKind};
23use crate::metric::Counter;
24
25/// Number of spans buffered thread-locally before flushing to the shared outbox.
26/// Higher values amortize the mutex cost but increase latency to export.
27const FLUSH_THRESHOLD: usize = 64;
28
29/// Maximum number of spans held per outbox. When a flush would exceed this
30/// limit the batch is silently dropped, bounding memory to
31/// `OUTBOX_CAPACITY × num_threads × sizeof(CompletedSpan)`.
32const OUTBOX_CAPACITY: usize = 4096;
33
34/// A shared outbox that a single thread flushes into and the exporter drains.
35struct Outbox {
36 spans: Mutex<Vec<CompletedSpan>>,
37}
38
39impl Outbox {
40 fn new() -> Self {
41 Self {
42 spans: Mutex::new(Vec::with_capacity(FLUSH_THRESHOLD * 2)),
43 }
44 }
45}
46
47/// Per-thread buffer for a single [`SpanCollector`].
48struct ThreadBuffer {
49 /// Thread-local span buffer. `push()` is a plain Vec append — zero atomics.
50 buffer: Vec<CompletedSpan>,
51 /// Shared outbox registered with the collector.
52 outbox: Arc<Outbox>,
53 /// Adaptive sampling: log2 of the sampling denominator.
54 /// 0 = record every span, 5 = 1/32, 6 = 1/64, 7 = 1/128.
55 sample_shift: u32,
56 /// Monotonic counter for sampling decisions.
57 span_counter: u64,
58}
59
60impl ThreadBuffer {
61 fn new(outbox: Arc<Outbox>) -> Self {
62 Self {
63 buffer: Vec::with_capacity(FLUSH_THRESHOLD),
64 outbox,
65 sample_shift: 0,
66 span_counter: 0,
67 }
68 }
69
70 /// Returns `true` if the next span should be recorded, based on the
71 /// current adaptive sampling rate. Pure thread-local arithmetic —
72 /// zero atomics.
73 #[inline]
74 fn should_record(&mut self) -> bool {
75 self.span_counter = self.span_counter.wrapping_add(1);
76 if self.sample_shift == 0 {
77 return true;
78 }
79 (self.span_counter & ((1u64 << self.sample_shift) - 1)) == 0
80 }
81
82 #[inline]
83 fn push(&mut self, span: CompletedSpan) {
84 self.buffer.push(span);
85 if self.buffer.len() >= FLUSH_THRESHOLD {
86 self.flush();
87 }
88 }
89
90 fn flush(&mut self) {
91 if !self.buffer.is_empty() {
92 let mut outbox = self.outbox.spans.lock();
93 let occupancy = outbox.len();
94 if occupancy < OUTBOX_CAPACITY {
95 outbox.append(&mut self.buffer);
96 } else {
97 self.buffer.clear();
98 }
99 // Adjust sampling rate based on outbox pressure.
100 self.sample_shift = if occupancy <= OUTBOX_CAPACITY / 4 {
101 0 // ≤25% full — record everything
102 } else if occupancy <= OUTBOX_CAPACITY / 2 {
103 5 // ≤50% — 1/32
104 } else if occupancy <= OUTBOX_CAPACITY * 3 / 4 {
105 6 // ≤75% — 1/64
106 } else {
107 7 // >75% — 1/128
108 };
109 }
110 }
111}
112
113impl Drop for ThreadBuffer {
114 fn drop(&mut self) {
115 self.flush();
116 }
117}
118
119/// Per-thread state: maps collector pointer → thread buffer.
120///
121/// Uses a raw pointer as key to avoid Arc overhead on the collector itself.
122/// This is safe because `submit()` is called through a `CollectorRef` that
123/// is guaranteed to outlive the span (see `CollectorRef` safety comments).
124struct ThreadLocalState {
125 /// Sorted by collector pointer for binary search. In practice there is
126 /// one collector per process, so this is a single-element vec.
127 entries: Vec<(usize, ThreadBuffer)>,
128}
129
130impl ThreadLocalState {
131 fn new() -> Self {
132 Self {
133 entries: Vec::new(),
134 }
135 }
136
137 #[inline]
138 fn get_or_register(&mut self, collector: &SpanCollector) -> &mut ThreadBuffer {
139 let key = collector as *const SpanCollector as usize;
140 // Fast path: check if we already have an entry for this collector.
141 let pos = self.entries.iter().position(|(k, _)| *k == key);
142 if let Some(pos) = pos {
143 return &mut self.entries[pos].1;
144 }
145 self.register(collector, key)
146 }
147
148 #[cold]
149 fn register(&mut self, collector: &SpanCollector, key: usize) -> &mut ThreadBuffer {
150 // Register a new outbox with the collector.
151 let outbox = Arc::new(Outbox::new());
152 collector.outboxes.lock().push(Arc::clone(&outbox));
153 self.entries.push((key, ThreadBuffer::new(outbox)));
154 &mut self.entries.last_mut().expect("just pushed").1
155 }
156}
157
158impl Drop for ThreadLocalState {
159 fn drop(&mut self) {
160 // Flush all remaining spans on thread exit.
161 for (_, buffer) in &mut self.entries {
162 buffer.flush();
163 }
164 }
165}
166
167thread_local! {
168 /// Thread-local span state.
169 ///
170 /// Uses `UnsafeCell` rather than `RefCell` to skip the runtime borrow
171 /// check on the per-span hot path (~5 cycles saved per submit). Safety
172 /// is preserved by enforcing two invariants:
173 /// 1. All access is single-threaded (`thread_local!`).
174 /// 2. Every `&mut` borrow is fully consumed inside the closure passed
175 /// to `LOCAL.with`. No closure recursively re-enters `with`, and
176 /// none of the called methods (`get_or_register`, `push`,
177 /// `should_record`) re-enter the cell.
178 static LOCAL: UnsafeCell<ThreadLocalState> = UnsafeCell::new(ThreadLocalState::new());
179}
180
181/// Thread-local span collector with zero-atomic submit path.
182///
183/// Completed spans are buffered in a thread-local [`Vec`] and periodically
184/// flushed to a shared outbox. The exporter calls
185/// [`drain_into`](SpanCollector::drain_into) to harvest all pending spans.
186///
187/// Created explicitly and held as `Arc<SpanCollector>`.
188pub struct SpanCollector {
189 /// Registered per-thread outboxes. Lock is taken only when:
190 /// (a) a new thread first submits a span (registration), or
191 /// (b) the exporter drains spans.
192 outboxes: Mutex<Vec<Arc<Outbox>>>,
193 /// Spans that were recorded (passed sampling).
194 spans_recorded: Counter,
195 /// Spans that were dropped by adaptive sampling.
196 spans_sampled_out: Counter,
197}
198
199impl SpanCollector {
200 /// Create a new collector.
201 ///
202 /// The `_num_shards` and `_capacity_per_shard` parameters are accepted for
203 /// API compatibility but are no longer used — each thread gets its own
204 /// buffer automatically, and buffers are unbounded.
205 pub fn new(_num_shards: usize, _capacity_per_shard: usize) -> Self {
206 Self {
207 outboxes: Mutex::new(Vec::new()),
208 spans_recorded: Counter::new(8),
209 spans_sampled_out: Counter::new(8),
210 }
211 }
212
213 /// Create a new root span with a fresh trace ID.
214 ///
215 /// The span is associated with this collector and will be submitted
216 /// here when it drops. Under high load, adaptive sampling may return
217 /// a no-op span that skips all recording and submission.
218 pub fn start_span(
219 self: &Arc<Self>,
220 name: impl Into<Cow<'static, str>>,
221 kind: SpanKind,
222 ) -> Span {
223 let collector_ref = CollectorRef::from_arc(self);
224 if self.should_record() {
225 self.spans_recorded.inc();
226 Span::new_root(name, kind, collector_ref)
227 } else {
228 self.spans_sampled_out.inc();
229 Span::noop(collector_ref)
230 }
231 }
232
233 /// Create a root span from an incoming W3C `traceparent` header.
234 ///
235 /// If the header is valid, the span inherits the remote trace ID and sets
236 /// `parent_span_id` to the remote span ID. If the header is `None` or
237 /// invalid, behaves like [`start_span`](Self::start_span) (new trace ID).
238 ///
239 /// Adaptive sampling applies: under load, may return a no-op span.
240 pub fn start_span_from_traceparent(
241 self: &Arc<Self>,
242 traceparent: Option<&str>,
243 name: impl Into<Cow<'static, str>>,
244 kind: SpanKind,
245 ) -> Span {
246 let collector_ref = CollectorRef::from_arc(self);
247 if !self.should_record() {
248 self.spans_sampled_out.inc();
249 return Span::noop(collector_ref);
250 }
251 self.spans_recorded.inc();
252 match traceparent.and_then(SpanContext::from_traceparent) {
253 Some(remote_ctx) => Span::new_from_remote(name, kind, remote_ctx, collector_ref),
254 None => Span::new_root(name, kind, collector_ref),
255 }
256 }
257
258 /// Check the thread-local adaptive sampling counter.
259 ///
260 /// Returns `true` if the next span should be recorded. Pure thread-local
261 /// arithmetic — zero atomics, zero contention.
262 #[inline]
263 fn should_record(&self) -> bool {
264 LOCAL.with(|cell| {
265 // SAFETY: see LOCAL definition. Single-thread access; the &mut
266 // borrow is fully consumed before this closure returns.
267 let state = unsafe { &mut *cell.get() };
268 state.get_or_register(self).should_record()
269 })
270 }
271
272 /// Submit a completed span. Called by [`Span::drop`].
273 ///
274 /// Pushes to a thread-local `Vec` — zero atomics on the fast path.
275 /// Every [`FLUSH_THRESHOLD`] spans, the buffer is moved to the shared
276 /// outbox under a per-thread mutex (uncontended).
277 #[inline]
278 pub(crate) fn submit(&self, span: CompletedSpan) {
279 LOCAL.with(|cell| {
280 // SAFETY: see LOCAL definition. Single-thread access; the &mut
281 // borrow is fully consumed before this closure returns.
282 let state = unsafe { &mut *cell.get() };
283 state.get_or_register(self).push(span);
284 });
285 }
286
287 /// Flush the current thread's local buffer to the shared outbox.
288 ///
289 /// Call this before [`drain_into`](Self::drain_into) when running on the
290 /// same thread that submitted spans (e.g., in tests or single-threaded
291 /// exporters). In production, thread-local buffers are flushed
292 /// automatically when they reach [`FLUSH_THRESHOLD`] or on thread exit.
293 pub fn flush_local(&self) {
294 LOCAL.with(|cell| {
295 // SAFETY: see LOCAL definition.
296 let state = unsafe { &mut *cell.get() };
297 let key = self as *const SpanCollector as usize;
298 if let Some(pos) = state.entries.iter().position(|(k, _)| *k == key) {
299 state.entries[pos].1.flush();
300 }
301 });
302 }
303
304 /// Drain all pending spans into the provided buffer.
305 ///
306 /// This is the primary method for exporters. It collects spans from all
307 /// registered thread outboxes. Spans still in thread-local buffers below
308 /// the flush threshold are NOT included unless [`flush_local`](Self::flush_local)
309 /// is called first (or the thread exits).
310 ///
311 /// The caller can reuse the buffer across export cycles to avoid repeated
312 /// allocation.
313 pub fn drain_into(&self, buf: &mut Vec<CompletedSpan>) {
314 let outboxes = self.outboxes.lock();
315 for outbox in outboxes.iter() {
316 let mut spans = outbox.spans.lock();
317 buf.append(&mut spans);
318 // Release excess capacity so drained outboxes don't hold onto
319 // large allocations between export cycles.
320 spans.shrink_to(FLUSH_THRESHOLD * 2);
321 }
322 }
323
324 /// Number of spans that were dropped.
325 ///
326 /// Always returns 0. Retained for API compatibility; use
327 /// [`sampled_out_count`](Self::sampled_out_count) for adaptive sampling stats.
328 pub fn dropped_count(&self) -> u64 {
329 0
330 }
331
332 /// Total spans that passed adaptive sampling and were recorded.
333 pub fn recorded_count(&self) -> u64 {
334 self.spans_recorded.sum() as u64
335 }
336
337 /// Total spans that were dropped by adaptive sampling.
338 pub fn sampled_out_count(&self) -> u64 {
339 self.spans_sampled_out.sum() as u64
340 }
341
342 /// Current number of spans waiting across all outboxes.
343 ///
344 /// Does not include spans still in thread-local buffers that haven't
345 /// been flushed yet.
346 pub fn len(&self) -> usize {
347 let outboxes = self.outboxes.lock();
348 outboxes.iter().map(|o| o.spans.lock().len()).sum()
349 }
350
351 /// Returns `true` if all outboxes are empty.
352 ///
353 /// Does not account for spans in thread-local buffers below the flush
354 /// threshold.
355 pub fn is_empty(&self) -> bool {
356 let outboxes = self.outboxes.lock();
357 outboxes.iter().all(|o| o.spans.lock().is_empty())
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 /// Helper: flush + drain for same-thread tests.
366 fn flush_and_drain(collector: &SpanCollector, buf: &mut Vec<CompletedSpan>) {
367 collector.flush_local();
368 collector.drain_into(buf);
369 }
370
371 #[test]
372 fn start_and_drain() {
373 let collector = Arc::new(SpanCollector::new(1, 16));
374 {
375 let _span = collector.start_span("op1", SpanKind::Server);
376 let _span2 = collector.start_span("op2", SpanKind::Client);
377 }
378 let mut buf = Vec::new();
379 flush_and_drain(&collector, &mut buf);
380 assert_eq!(buf.len(), 2);
381 }
382
383 #[test]
384 fn small_batches_no_drops() {
385 let collector = Arc::new(SpanCollector::new(1, 2));
386 // Small batches below OUTBOX_CAPACITY are fully collected.
387 {
388 let _s1 = collector.start_span("a", SpanKind::Internal);
389 let _s2 = collector.start_span("b", SpanKind::Internal);
390 let _s3 = collector.start_span("c", SpanKind::Internal);
391 }
392 let mut buf = Vec::new();
393 flush_and_drain(&collector, &mut buf);
394 assert_eq!(buf.len(), 3);
395 }
396
397 #[test]
398 fn from_traceparent_valid() {
399 let collector = Arc::new(SpanCollector::new(1, 16));
400 let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
401 {
402 let _span =
403 collector.start_span_from_traceparent(Some(tp), "handler", SpanKind::Server);
404 }
405 let mut buf = Vec::new();
406 flush_and_drain(&collector, &mut buf);
407 assert_eq!(buf.len(), 1);
408 assert_eq!(
409 buf[0].trace_id.to_string(),
410 "4bf92f3577b34da6a3ce929d0e0e4736"
411 );
412 }
413
414 #[test]
415 fn from_traceparent_invalid_falls_back() {
416 let collector = Arc::new(SpanCollector::new(1, 16));
417 {
418 let _span =
419 collector.start_span_from_traceparent(Some("garbage"), "handler", SpanKind::Server);
420 }
421 let mut buf = Vec::new();
422 flush_and_drain(&collector, &mut buf);
423 assert_eq!(buf.len(), 1);
424 assert!(!buf[0].trace_id.is_invalid());
425 assert!(buf[0].parent_span_id.is_invalid());
426 }
427
428 #[test]
429 fn from_traceparent_none_creates_root() {
430 let collector = Arc::new(SpanCollector::new(1, 16));
431 {
432 let _span = collector.start_span_from_traceparent(None, "handler", SpanKind::Server);
433 }
434 let mut buf = Vec::new();
435 flush_and_drain(&collector, &mut buf);
436 assert_eq!(buf.len(), 1);
437 assert!(buf[0].parent_span_id.is_invalid());
438 }
439
440 #[test]
441 fn concurrent_submission() {
442 let collector = Arc::new(SpanCollector::new(8, 1024));
443 let mut handles = Vec::new();
444
445 for t in 0..4 {
446 let c = Arc::clone(&collector);
447 handles.push(std::thread::spawn(move || {
448 for i in 0..100 {
449 let _span =
450 c.start_span(format!("thread_{}_span_{}", t, i), SpanKind::Internal);
451 }
452 }));
453 }
454
455 for h in handles {
456 h.join().expect("thread join");
457 }
458
459 // Thread-local Drop flushes on thread exit, so drain_into is sufficient.
460 let mut buf = Vec::new();
461 collector.drain_into(&mut buf);
462 assert_eq!(buf.len(), 400);
463 assert_eq!(collector.dropped_count(), 0);
464 }
465
466 #[test]
467 fn flush_threshold_batching() {
468 let collector = Arc::new(SpanCollector::new(1, 64));
469 // Submit fewer spans than FLUSH_THRESHOLD — they should stay in
470 // thread-local buffer until flushed or the threshold is reached.
471 for _ in 0..FLUSH_THRESHOLD - 1 {
472 let _span = collector.start_span("sub_threshold", SpanKind::Internal);
473 }
474 // Outbox should be empty (all in thread-local buffer).
475 assert_eq!(collector.len(), 0);
476
477 // Submit one more to cross the threshold.
478 {
479 let _span = collector.start_span("trigger", SpanKind::Internal);
480 }
481 // Now the outbox should have FLUSH_THRESHOLD spans.
482 assert_eq!(collector.len(), FLUSH_THRESHOLD);
483 }
484
485 #[test]
486 fn flush_local_forces_transfer() {
487 let collector = Arc::new(SpanCollector::new(1, 64));
488 // Submit fewer than threshold.
489 for _ in 0..5 {
490 let _span = collector.start_span("local", SpanKind::Internal);
491 }
492 assert_eq!(collector.len(), 0);
493 collector.flush_local();
494 assert_eq!(collector.len(), 5);
495 }
496
497 #[test]
498 fn thread_exit_flushes() {
499 let collector = Arc::new(SpanCollector::new(1, 64));
500 let c = Arc::clone(&collector);
501 let handle = std::thread::spawn(move || {
502 // Submit fewer than FLUSH_THRESHOLD.
503 for _ in 0..10 {
504 let _span = c.start_span("thread_exit", SpanKind::Internal);
505 }
506 // Thread-local Drop should flush on thread exit.
507 });
508 handle.join().expect("thread join");
509
510 let mut buf = Vec::new();
511 collector.drain_into(&mut buf);
512 assert_eq!(buf.len(), 10);
513 }
514}