1use std::borrow::Cow;
16use std::cell::RefCell;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20
21use super::context::SpanContext;
22use super::types::{CollectorRef, CompletedSpan, Span, SpanKind};
23use crate::metric::Counter;
24
25const FLUSH_THRESHOLD: usize = 64;
28
29const OUTBOX_CAPACITY: usize = 4096;
33
34struct Outbox {
36 spans: Mutex<Vec<CompletedSpan>>,
37}
38
39impl Outbox {
40 fn new() -> Self {
41 Self {
42 spans: Mutex::new(Vec::with_capacity(FLUSH_THRESHOLD * 2)),
43 }
44 }
45}
46
47struct ThreadBuffer {
49 buffer: Vec<CompletedSpan>,
51 outbox: Arc<Outbox>,
53 sample_shift: u32,
56 span_counter: u64,
58}
59
60impl ThreadBuffer {
61 fn new(outbox: Arc<Outbox>) -> Self {
62 Self {
63 buffer: Vec::with_capacity(FLUSH_THRESHOLD),
64 outbox,
65 sample_shift: 0,
66 span_counter: 0,
67 }
68 }
69
70 #[inline]
74 fn should_record(&mut self) -> bool {
75 self.span_counter = self.span_counter.wrapping_add(1);
76 if self.sample_shift == 0 {
77 return true;
78 }
79 (self.span_counter & ((1u64 << self.sample_shift) - 1)) == 0
80 }
81
82 #[inline]
83 fn push(&mut self, span: CompletedSpan) {
84 self.buffer.push(span);
85 if self.buffer.len() >= FLUSH_THRESHOLD {
86 self.flush();
87 }
88 }
89
90 fn flush(&mut self) {
91 if !self.buffer.is_empty() {
92 let mut outbox = self.outbox.spans.lock();
93 let occupancy = outbox.len();
94 if occupancy < OUTBOX_CAPACITY {
95 outbox.append(&mut self.buffer);
96 } else {
97 self.buffer.clear();
98 }
99 self.sample_shift = if occupancy <= OUTBOX_CAPACITY / 4 {
101 0 } else if occupancy <= OUTBOX_CAPACITY / 2 {
103 5 } else if occupancy <= OUTBOX_CAPACITY * 3 / 4 {
105 6 } else {
107 7 };
109 }
110 }
111}
112
113impl Drop for ThreadBuffer {
114 fn drop(&mut self) {
115 self.flush();
116 }
117}
118
119struct ThreadLocalState {
125 entries: Vec<(usize, ThreadBuffer)>,
128}
129
130impl ThreadLocalState {
131 fn new() -> Self {
132 Self {
133 entries: Vec::new(),
134 }
135 }
136
137 #[inline]
138 fn get_or_register(&mut self, collector: &SpanCollector) -> &mut ThreadBuffer {
139 let key = collector as *const SpanCollector as usize;
140 let pos = self.entries.iter().position(|(k, _)| *k == key);
142 if let Some(pos) = pos {
143 return &mut self.entries[pos].1;
144 }
145 self.register(collector, key)
146 }
147
148 #[cold]
149 fn register(&mut self, collector: &SpanCollector, key: usize) -> &mut ThreadBuffer {
150 let outbox = Arc::new(Outbox::new());
152 collector.outboxes.lock().push(Arc::clone(&outbox));
153 self.entries.push((key, ThreadBuffer::new(outbox)));
154 &mut self.entries.last_mut().expect("just pushed").1
155 }
156}
157
158impl Drop for ThreadLocalState {
159 fn drop(&mut self) {
160 for (_, buffer) in &mut self.entries {
162 buffer.flush();
163 }
164 }
165}
166
167thread_local! {
168 static LOCAL: RefCell<ThreadLocalState> = RefCell::new(ThreadLocalState::new());
169}
170
171pub struct SpanCollector {
179 outboxes: Mutex<Vec<Arc<Outbox>>>,
183 spans_recorded: Counter,
185 spans_sampled_out: Counter,
187}
188
189impl SpanCollector {
190 pub fn new(_num_shards: usize, _capacity_per_shard: usize) -> Self {
196 Self {
197 outboxes: Mutex::new(Vec::new()),
198 spans_recorded: Counter::new(8),
199 spans_sampled_out: Counter::new(8),
200 }
201 }
202
203 pub fn start_span(
209 self: &Arc<Self>,
210 name: impl Into<Cow<'static, str>>,
211 kind: SpanKind,
212 ) -> Span {
213 let collector_ref = CollectorRef::from_arc(self);
214 if self.should_record() {
215 self.spans_recorded.inc();
216 Span::new_root(name, kind, collector_ref)
217 } else {
218 self.spans_sampled_out.inc();
219 Span::noop(collector_ref)
220 }
221 }
222
223 pub fn start_span_from_traceparent(
231 self: &Arc<Self>,
232 traceparent: Option<&str>,
233 name: impl Into<Cow<'static, str>>,
234 kind: SpanKind,
235 ) -> Span {
236 let collector_ref = CollectorRef::from_arc(self);
237 if !self.should_record() {
238 self.spans_sampled_out.inc();
239 return Span::noop(collector_ref);
240 }
241 self.spans_recorded.inc();
242 match traceparent.and_then(SpanContext::from_traceparent) {
243 Some(remote_ctx) => Span::new_from_remote(name, kind, remote_ctx, collector_ref),
244 None => Span::new_root(name, kind, collector_ref),
245 }
246 }
247
248 #[inline]
253 fn should_record(&self) -> bool {
254 LOCAL.with(|cell| cell.borrow_mut().get_or_register(self).should_record())
255 }
256
257 #[inline]
263 pub(crate) fn submit(&self, span: CompletedSpan) {
264 LOCAL.with(|cell| {
265 cell.borrow_mut().get_or_register(self).push(span);
266 });
267 }
268
269 pub fn flush_local(&self) {
276 LOCAL.with(|cell| {
277 let mut state = cell.borrow_mut();
278 let key = self as *const SpanCollector as usize;
279 if let Some(pos) = state.entries.iter().position(|(k, _)| *k == key) {
280 state.entries[pos].1.flush();
281 }
282 });
283 }
284
285 pub fn drain_into(&self, buf: &mut Vec<CompletedSpan>) {
295 let outboxes = self.outboxes.lock();
296 for outbox in outboxes.iter() {
297 let mut spans = outbox.spans.lock();
298 buf.append(&mut spans);
299 spans.shrink_to(FLUSH_THRESHOLD * 2);
302 }
303 }
304
305 pub fn dropped_count(&self) -> u64 {
310 0
311 }
312
313 pub fn recorded_count(&self) -> u64 {
315 self.spans_recorded.sum() as u64
316 }
317
318 pub fn sampled_out_count(&self) -> u64 {
320 self.spans_sampled_out.sum() as u64
321 }
322
323 pub fn len(&self) -> usize {
328 let outboxes = self.outboxes.lock();
329 outboxes.iter().map(|o| o.spans.lock().len()).sum()
330 }
331
332 pub fn is_empty(&self) -> bool {
337 let outboxes = self.outboxes.lock();
338 outboxes.iter().all(|o| o.spans.lock().is_empty())
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 fn flush_and_drain(collector: &SpanCollector, buf: &mut Vec<CompletedSpan>) {
348 collector.flush_local();
349 collector.drain_into(buf);
350 }
351
352 #[test]
353 fn start_and_drain() {
354 let collector = Arc::new(SpanCollector::new(1, 16));
355 {
356 let _span = collector.start_span("op1", SpanKind::Server);
357 let _span2 = collector.start_span("op2", SpanKind::Client);
358 }
359 let mut buf = Vec::new();
360 flush_and_drain(&collector, &mut buf);
361 assert_eq!(buf.len(), 2);
362 }
363
364 #[test]
365 fn small_batches_no_drops() {
366 let collector = Arc::new(SpanCollector::new(1, 2));
367 {
369 let _s1 = collector.start_span("a", SpanKind::Internal);
370 let _s2 = collector.start_span("b", SpanKind::Internal);
371 let _s3 = collector.start_span("c", SpanKind::Internal);
372 }
373 let mut buf = Vec::new();
374 flush_and_drain(&collector, &mut buf);
375 assert_eq!(buf.len(), 3);
376 }
377
378 #[test]
379 fn from_traceparent_valid() {
380 let collector = Arc::new(SpanCollector::new(1, 16));
381 let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
382 {
383 let _span =
384 collector.start_span_from_traceparent(Some(tp), "handler", SpanKind::Server);
385 }
386 let mut buf = Vec::new();
387 flush_and_drain(&collector, &mut buf);
388 assert_eq!(buf.len(), 1);
389 assert_eq!(
390 buf[0].trace_id.to_string(),
391 "4bf92f3577b34da6a3ce929d0e0e4736"
392 );
393 }
394
395 #[test]
396 fn from_traceparent_invalid_falls_back() {
397 let collector = Arc::new(SpanCollector::new(1, 16));
398 {
399 let _span =
400 collector.start_span_from_traceparent(Some("garbage"), "handler", SpanKind::Server);
401 }
402 let mut buf = Vec::new();
403 flush_and_drain(&collector, &mut buf);
404 assert_eq!(buf.len(), 1);
405 assert!(!buf[0].trace_id.is_invalid());
406 assert!(buf[0].parent_span_id.is_invalid());
407 }
408
409 #[test]
410 fn from_traceparent_none_creates_root() {
411 let collector = Arc::new(SpanCollector::new(1, 16));
412 {
413 let _span = collector.start_span_from_traceparent(None, "handler", SpanKind::Server);
414 }
415 let mut buf = Vec::new();
416 flush_and_drain(&collector, &mut buf);
417 assert_eq!(buf.len(), 1);
418 assert!(buf[0].parent_span_id.is_invalid());
419 }
420
421 #[test]
422 fn concurrent_submission() {
423 let collector = Arc::new(SpanCollector::new(8, 1024));
424 let mut handles = Vec::new();
425
426 for t in 0..4 {
427 let c = Arc::clone(&collector);
428 handles.push(std::thread::spawn(move || {
429 for i in 0..100 {
430 let _span =
431 c.start_span(format!("thread_{}_span_{}", t, i), SpanKind::Internal);
432 }
433 }));
434 }
435
436 for h in handles {
437 h.join().expect("thread join");
438 }
439
440 let mut buf = Vec::new();
442 collector.drain_into(&mut buf);
443 assert_eq!(buf.len(), 400);
444 assert_eq!(collector.dropped_count(), 0);
445 }
446
447 #[test]
448 fn flush_threshold_batching() {
449 let collector = Arc::new(SpanCollector::new(1, 64));
450 for _ in 0..FLUSH_THRESHOLD - 1 {
453 let _span = collector.start_span("sub_threshold", SpanKind::Internal);
454 }
455 assert_eq!(collector.len(), 0);
457
458 {
460 let _span = collector.start_span("trigger", SpanKind::Internal);
461 }
462 assert_eq!(collector.len(), FLUSH_THRESHOLD);
464 }
465
466 #[test]
467 fn flush_local_forces_transfer() {
468 let collector = Arc::new(SpanCollector::new(1, 64));
469 for _ in 0..5 {
471 let _span = collector.start_span("local", SpanKind::Internal);
472 }
473 assert_eq!(collector.len(), 0);
474 collector.flush_local();
475 assert_eq!(collector.len(), 5);
476 }
477
478 #[test]
479 fn thread_exit_flushes() {
480 let collector = Arc::new(SpanCollector::new(1, 64));
481 let c = Arc::clone(&collector);
482 let handle = std::thread::spawn(move || {
483 for _ in 0..10 {
485 let _span = c.start_span("thread_exit", SpanKind::Internal);
486 }
487 });
489 handle.join().expect("thread join");
490
491 let mut buf = Vec::new();
492 collector.drain_into(&mut buf);
493 assert_eq!(buf.len(), 10);
494 }
495}