use std::borrow::Cow;
use std::cell::UnsafeCell;
use std::sync::Arc;
use parking_lot::Mutex;
use super::context::SpanContext;
use super::types::{CollectorRef, CompletedSpan, Span, SpanKind};
use crate::metric::Counter;
const FLUSH_THRESHOLD: usize = 64;
const OUTBOX_CAPACITY: usize = 4096;
struct Outbox {
spans: Mutex<Vec<CompletedSpan>>,
}
impl Outbox {
fn new() -> Self {
Self {
spans: Mutex::new(Vec::with_capacity(FLUSH_THRESHOLD * 2)),
}
}
}
struct ThreadBuffer {
buffer: Vec<CompletedSpan>,
outbox: Arc<Outbox>,
sample_shift: u32,
span_counter: u64,
}
impl ThreadBuffer {
fn new(outbox: Arc<Outbox>) -> Self {
Self {
buffer: Vec::with_capacity(FLUSH_THRESHOLD),
outbox,
sample_shift: 0,
span_counter: 0,
}
}
#[inline]
fn should_record(&mut self) -> bool {
self.span_counter = self.span_counter.wrapping_add(1);
if self.sample_shift == 0 {
return true;
}
(self.span_counter & ((1u64 << self.sample_shift) - 1)) == 0
}
#[inline]
fn push(&mut self, span: CompletedSpan) {
self.buffer.push(span);
if self.buffer.len() >= FLUSH_THRESHOLD {
self.flush();
}
}
fn flush(&mut self) {
if !self.buffer.is_empty() {
let mut outbox = self.outbox.spans.lock();
let occupancy = outbox.len();
if occupancy < OUTBOX_CAPACITY {
outbox.append(&mut self.buffer);
} else {
self.buffer.clear();
}
self.sample_shift = if occupancy <= OUTBOX_CAPACITY / 4 {
0 } else if occupancy <= OUTBOX_CAPACITY / 2 {
5 } else if occupancy <= OUTBOX_CAPACITY * 3 / 4 {
6 } else {
7 };
}
}
}
impl Drop for ThreadBuffer {
fn drop(&mut self) {
self.flush();
}
}
struct ThreadLocalState {
entries: Vec<(usize, ThreadBuffer)>,
}
impl ThreadLocalState {
fn new() -> Self {
Self {
entries: Vec::new(),
}
}
#[inline]
fn get_or_register(&mut self, collector: &SpanCollector) -> &mut ThreadBuffer {
let key = collector as *const SpanCollector as usize;
let pos = self.entries.iter().position(|(k, _)| *k == key);
if let Some(pos) = pos {
return &mut self.entries[pos].1;
}
self.register(collector, key)
}
#[cold]
fn register(&mut self, collector: &SpanCollector, key: usize) -> &mut ThreadBuffer {
let outbox = Arc::new(Outbox::new());
collector.outboxes.lock().push(Arc::clone(&outbox));
self.entries.push((key, ThreadBuffer::new(outbox)));
&mut self.entries.last_mut().expect("just pushed").1
}
}
impl Drop for ThreadLocalState {
fn drop(&mut self) {
for (_, buffer) in &mut self.entries {
buffer.flush();
}
}
}
thread_local! {
static LOCAL: UnsafeCell<ThreadLocalState> = UnsafeCell::new(ThreadLocalState::new());
}
pub struct SpanCollector {
outboxes: Mutex<Vec<Arc<Outbox>>>,
spans_recorded: Counter,
spans_sampled_out: Counter,
}
impl SpanCollector {
pub fn new(_num_shards: usize, _capacity_per_shard: usize) -> Self {
Self {
outboxes: Mutex::new(Vec::new()),
spans_recorded: Counter::new(8),
spans_sampled_out: Counter::new(8),
}
}
pub fn start_span(
self: &Arc<Self>,
name: impl Into<Cow<'static, str>>,
kind: SpanKind,
) -> Span {
let collector_ref = CollectorRef::from_arc(self);
if self.should_record() {
self.spans_recorded.inc();
Span::new_root(name, kind, collector_ref)
} else {
self.spans_sampled_out.inc();
Span::noop(collector_ref)
}
}
pub fn start_span_from_traceparent(
self: &Arc<Self>,
traceparent: Option<&str>,
name: impl Into<Cow<'static, str>>,
kind: SpanKind,
) -> Span {
let collector_ref = CollectorRef::from_arc(self);
if !self.should_record() {
self.spans_sampled_out.inc();
return Span::noop(collector_ref);
}
self.spans_recorded.inc();
match traceparent.and_then(SpanContext::from_traceparent) {
Some(remote_ctx) => Span::new_from_remote(name, kind, remote_ctx, collector_ref),
None => Span::new_root(name, kind, collector_ref),
}
}
#[inline]
fn should_record(&self) -> bool {
LOCAL.with(|cell| {
let state = unsafe { &mut *cell.get() };
state.get_or_register(self).should_record()
})
}
#[inline]
pub(crate) fn submit(&self, span: CompletedSpan) {
LOCAL.with(|cell| {
let state = unsafe { &mut *cell.get() };
state.get_or_register(self).push(span);
});
}
pub fn flush_local(&self) {
LOCAL.with(|cell| {
let state = unsafe { &mut *cell.get() };
let key = self as *const SpanCollector as usize;
if let Some(pos) = state.entries.iter().position(|(k, _)| *k == key) {
state.entries[pos].1.flush();
}
});
}
pub fn drain_into(&self, buf: &mut Vec<CompletedSpan>) {
let outboxes = self.outboxes.lock();
for outbox in outboxes.iter() {
let mut spans = outbox.spans.lock();
buf.append(&mut spans);
spans.shrink_to(FLUSH_THRESHOLD * 2);
}
}
pub fn dropped_count(&self) -> u64 {
0
}
pub fn recorded_count(&self) -> u64 {
self.spans_recorded.sum() as u64
}
pub fn sampled_out_count(&self) -> u64 {
self.spans_sampled_out.sum() as u64
}
pub fn len(&self) -> usize {
let outboxes = self.outboxes.lock();
outboxes.iter().map(|o| o.spans.lock().len()).sum()
}
pub fn is_empty(&self) -> bool {
let outboxes = self.outboxes.lock();
outboxes.iter().all(|o| o.spans.lock().is_empty())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn flush_and_drain(collector: &SpanCollector, buf: &mut Vec<CompletedSpan>) {
collector.flush_local();
collector.drain_into(buf);
}
#[test]
fn start_and_drain() {
let collector = Arc::new(SpanCollector::new(1, 16));
{
let _span = collector.start_span("op1", SpanKind::Server);
let _span2 = collector.start_span("op2", SpanKind::Client);
}
let mut buf = Vec::new();
flush_and_drain(&collector, &mut buf);
assert_eq!(buf.len(), 2);
}
#[test]
fn small_batches_no_drops() {
let collector = Arc::new(SpanCollector::new(1, 2));
{
let _s1 = collector.start_span("a", SpanKind::Internal);
let _s2 = collector.start_span("b", SpanKind::Internal);
let _s3 = collector.start_span("c", SpanKind::Internal);
}
let mut buf = Vec::new();
flush_and_drain(&collector, &mut buf);
assert_eq!(buf.len(), 3);
}
#[test]
fn from_traceparent_valid() {
let collector = Arc::new(SpanCollector::new(1, 16));
let tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
{
let _span =
collector.start_span_from_traceparent(Some(tp), "handler", SpanKind::Server);
}
let mut buf = Vec::new();
flush_and_drain(&collector, &mut buf);
assert_eq!(buf.len(), 1);
assert_eq!(
buf[0].trace_id.to_string(),
"4bf92f3577b34da6a3ce929d0e0e4736"
);
}
#[test]
fn from_traceparent_invalid_falls_back() {
let collector = Arc::new(SpanCollector::new(1, 16));
{
let _span =
collector.start_span_from_traceparent(Some("garbage"), "handler", SpanKind::Server);
}
let mut buf = Vec::new();
flush_and_drain(&collector, &mut buf);
assert_eq!(buf.len(), 1);
assert!(!buf[0].trace_id.is_invalid());
assert!(buf[0].parent_span_id.is_invalid());
}
#[test]
fn from_traceparent_none_creates_root() {
let collector = Arc::new(SpanCollector::new(1, 16));
{
let _span = collector.start_span_from_traceparent(None, "handler", SpanKind::Server);
}
let mut buf = Vec::new();
flush_and_drain(&collector, &mut buf);
assert_eq!(buf.len(), 1);
assert!(buf[0].parent_span_id.is_invalid());
}
#[test]
fn concurrent_submission() {
let collector = Arc::new(SpanCollector::new(8, 1024));
let mut handles = Vec::new();
for t in 0..4 {
let c = Arc::clone(&collector);
handles.push(std::thread::spawn(move || {
for i in 0..100 {
let _span =
c.start_span(format!("thread_{}_span_{}", t, i), SpanKind::Internal);
}
}));
}
for h in handles {
h.join().expect("thread join");
}
let mut buf = Vec::new();
collector.drain_into(&mut buf);
assert_eq!(buf.len(), 400);
assert_eq!(collector.dropped_count(), 0);
}
#[test]
fn flush_threshold_batching() {
let collector = Arc::new(SpanCollector::new(1, 64));
for _ in 0..FLUSH_THRESHOLD - 1 {
let _span = collector.start_span("sub_threshold", SpanKind::Internal);
}
assert_eq!(collector.len(), 0);
{
let _span = collector.start_span("trigger", SpanKind::Internal);
}
assert_eq!(collector.len(), FLUSH_THRESHOLD);
}
#[test]
fn flush_local_forces_transfer() {
let collector = Arc::new(SpanCollector::new(1, 64));
for _ in 0..5 {
let _span = collector.start_span("local", SpanKind::Internal);
}
assert_eq!(collector.len(), 0);
collector.flush_local();
assert_eq!(collector.len(), 5);
}
#[test]
fn thread_exit_flushes() {
let collector = Arc::new(SpanCollector::new(1, 64));
let c = Arc::clone(&collector);
let handle = std::thread::spawn(move || {
for _ in 0..10 {
let _span = c.start_span("thread_exit", SpanKind::Internal);
}
});
handle.join().expect("thread join");
let mut buf = Vec::new();
collector.drain_into(&mut buf);
assert_eq!(buf.len(), 10);
}
}