use std::collections::BTreeMap;
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::atomic::{AtomicU64, Ordering, Ordering::Relaxed};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
use slab::Slab;
use tracing::metadata::LevelFilter;
use tracing::{Level, Metadata};
use crate::config::CacheConfig;
use crate::driver::{Driver, EventMessage};
use crate::id_encoding::{DISABLED, SLAB_OFFSET, disabled_id, id_to_u64, u64_to_id};
use crate::object_pool::ObjectPool;
use crate::predicate::{EnabledPredicate, Interest, LevelPredicate};
use crate::record::{EventRecord, FieldList, FieldVisitor, SpanRecord};
use crate::thread_state::{
ID_BATCH, ID_CURSOR, StackedSpan, THREAD_SENDERS, ThreadSenders, ensure_thread_shard_key,
pending_drain_events, pending_drain_spans, pending_push_event, pending_push_span, stack_pop,
stack_push, stack_top,
};
pub(crate) struct ShardLane {
pub(crate) slab: Mutex<Slab<SpanRecord>>,
pub(crate) actual_ids: Box<[AtomicU64]>,
}
pub struct SpanCache<P: EnabledPredicate = LevelPredicate> {
pub(crate) in_flight: Box<[ShardLane]>,
pub(crate) map: Arc<RwLock<BTreeMap<u64, SpanRecord>>>,
pub(crate) id_high_water: AtomicU64,
pub(crate) predicate: P,
pub(crate) shard_capacity: usize,
pub(crate) span_sender: spillway::Sender<SpanRecord>,
pub(crate) event_sender: spillway::Sender<EventMessage>,
pub(crate) pending_batch: usize,
pub(crate) shard_mask: u64, pub(crate) shard_shift: u32, pub(crate) event_pool: Arc<ObjectPool<EventRecord>>,
}
impl SpanCache<LevelPredicate> {
pub fn new(capacity: usize) -> (Self, Driver) {
Self::with_predicate(capacity, LevelPredicate::new(Level::TRACE))
}
pub fn with_config(capacity: usize, config: CacheConfig) -> (Self, Driver) {
Self::with_predicate_and_config(capacity, LevelPredicate::new(Level::TRACE), config)
}
}
impl<P: EnabledPredicate> SpanCache<P> {
pub fn with_predicate(capacity: usize, predicate: P) -> (Self, Driver) {
Self::with_predicate_and_config(capacity, predicate, CacheConfig::default())
}
pub fn with_predicate_and_config(
capacity: usize,
predicate: P,
config: CacheConfig,
) -> (Self, Driver) {
let lane_count = config.lane_count.clamp(1, 256).next_power_of_two();
let shard_bits = lane_count.trailing_zeros();
let shard_mask = (lane_count as u64) - 1;
let shard_shift = 64 - shard_bits.max(1);
let (span_sender, span_receiver) =
spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
let (event_sender, event_receiver) =
spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
let map = Arc::new(RwLock::new(BTreeMap::new()));
let shard_capacity = capacity.div_ceil(lane_count);
let in_flight: Box<[ShardLane]> = (0..lane_count)
.map(|_| ShardLane {
slab: Mutex::new(Slab::with_capacity(shard_capacity)),
actual_ids: (0..shard_capacity)
.map(|_| AtomicU64::new(0))
.collect::<Vec<_>>()
.into_boxed_slice(),
})
.collect::<Vec<_>>()
.into_boxed_slice();
let event_pool = ObjectPool::<EventRecord>::new(lane_count, 256);
let cache = SpanCache {
in_flight,
map: Arc::clone(&map),
id_high_water: AtomicU64::new(ID_BATCH),
predicate,
shard_capacity,
span_sender,
event_sender,
pending_batch: config.pending_batch,
shard_mask,
shard_shift,
event_pool,
};
let driver = Driver {
map,
span_receiver,
event_receiver,
capacity,
side_events: std::collections::BTreeMap::new(),
};
(cache, driver)
}
pub fn lane_count(&self) -> usize {
self.in_flight.len()
}
#[inline]
pub(crate) fn pick_shard(&self) -> usize {
(ensure_thread_shard_key() & self.shard_mask) as usize
}
#[inline]
pub(crate) fn allocate_actual_id(&self) -> u64 {
ID_CURSOR.with(|cell| {
let cursor = cell.get();
if (cursor & (ID_BATCH - 1)) != 0 {
cell.set(cursor + 1);
cursor
} else {
let start = self.id_high_water.fetch_add(ID_BATCH, Relaxed);
cell.set(start + 1);
start
}
})
}
#[inline]
pub(crate) fn encode_tracing_id(&self, shard: usize, slab_idx: usize) -> u64 {
((shard as u64) << self.shard_shift) | ((slab_idx as u64) + SLAB_OFFSET)
}
#[inline]
pub(crate) fn decode_tracing_id(&self, id: u64) -> Option<(usize, usize)> {
if id == DISABLED {
return None;
}
let slab_mask = (1u64 << self.shard_shift) - 1;
let raw = id & slab_mask;
if raw < SLAB_OFFSET {
return None;
}
let shard = ((id >> self.shard_shift) & self.shard_mask) as usize;
Some((shard, (raw - SLAB_OFFSET) as usize))
}
#[inline]
pub(crate) fn load_actual_id(&self, shard: usize, slab_idx: usize) -> u64 {
self.in_flight[shard].actual_ids[slab_idx].load(Ordering::Acquire)
}
pub fn get_span(&self, actual_id: u64) -> Option<SpanRecord> {
#[allow(clippy::expect_used, reason = "poisoned lock")]
let map = self.map.read().expect("lock must not be poisoned");
map.get(&actual_id).cloned()
}
pub fn clear(&self) {
#[allow(clippy::expect_used, reason = "poisoned lock")]
let mut map = self.map.write().expect("lock must not be poisoned");
map.clear();
}
pub fn actual_id_for(&self, tracing_id: u64) -> Option<u64> {
let (shard, slab_idx) = self.decode_tracing_id(tracing_id)?;
Some(self.load_actual_id(shard, slab_idx))
}
pub fn page(&self, after_id: u64, limit: usize) -> Vec<SpanRecord> {
#[allow(clippy::expect_used, reason = "poisoned lock")]
let map = self.map.read().expect("lock must not be poisoned");
if after_id == 0 {
map.values().take(limit).cloned().collect()
} else {
map.range((Excluded(after_id), Unbounded))
.take(limit)
.map(|(_, v)| v.clone())
.collect()
}
}
pub fn flush_pending(&self) {
THREAD_SENDERS.with(|sc| {
let slot = unsafe { &mut *sc.get() };
let cache_addr = self as *const _ as usize;
let needs_init = !matches!(slot, Some(t) if t.cache_addr == cache_addr);
if needs_init {
*slot = Some(ThreadSenders {
cache_addr,
span: self.span_sender.clone(),
event: self.event_sender.clone(),
});
}
let senders = unsafe { slot.as_ref().unwrap_unchecked() };
pending_drain_events(|events| {
if events.len() > 0
&& let Err(spillway::Error::Full(_dropped)) = senders.event.send_many(events)
{
log::debug!("event channel full; dropping a batch — driver is behind");
}
});
pending_drain_spans(|spans| {
if spans.len() > 0
&& let Err(spillway::Error::Full(_dropped)) = senders.span.send_many(spans)
{
log::debug!("span channel full; dropping a batch — driver is behind");
}
});
});
}
}
impl<P: EnabledPredicate> tracing::Subscriber for SpanCache<P> {
fn max_level_hint(&self) -> Option<LevelFilter> {
self.predicate.max_level_hint()
}
fn register_callsite(
&self,
metadata: &'static Metadata<'static>,
) -> tracing::subscriber::Interest {
match self.predicate.callsite_enabled(metadata) {
Interest::Never => tracing::subscriber::Interest::never(),
Interest::Sometimes => tracing::subscriber::Interest::sometimes(),
Interest::Always => tracing::subscriber::Interest::always(),
}
}
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
if matches!(stack_top(), Some(s) if s.tracing_id == DISABLED) {
return false;
}
self.predicate.enabled(metadata)
}
fn event_enabled(&self, event: &tracing::Event<'_>) -> bool {
self.predicate.enabled(event.metadata())
}
fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
let parent_actual_id: Option<u64> = if attrs.is_contextual() {
match stack_top() {
None => return disabled_id(),
Some(top) if top.tracing_id == DISABLED => return disabled_id(),
Some(top) => Some(top.actual_id),
}
} else if attrs.is_root() {
if stack_top().is_some() {
log::warn!("root span created with an active span on the stack; disabling");
return disabled_id();
}
None
} else {
let Some(parent) = attrs.parent() else {
return disabled_id();
};
let explicit = id_to_u64(parent);
match self.decode_tracing_id(explicit) {
Some((p_shard, p_slab)) => Some(self.load_actual_id(p_shard, p_slab)),
None => return disabled_id(),
}
};
if !self.predicate.new_span_enabled(attrs) {
return disabled_id();
}
let actual_id = self.allocate_actual_id();
let mut record = SpanRecord {
id: actual_id,
parent_id: parent_actual_id,
metadata: attrs.metadata(),
fields: FieldList::new(),
events: Vec::new(),
opened_at: Instant::now(),
closed_at: None,
};
attrs.record(&mut FieldVisitor {
fields: &mut record.fields,
});
let shard = self.pick_shard();
let lane = &self.in_flight[shard];
let slab_idx = {
#[allow(clippy::expect_used, reason = "poisoned lock")]
let mut slab = lane.slab.lock().expect("lock must not be poisoned");
if slab.len() >= self.shard_capacity {
log::warn!(
"span shard {shard} full; new span disabled. \
Increase capacity or reduce span rate."
);
return disabled_id();
}
slab.insert(record)
};
lane.actual_ids[slab_idx].store(actual_id, Ordering::Release);
u64_to_id(self.encode_tracing_id(shard, slab_idx))
}
fn record(&self, span: &tracing::span::Id, values: &tracing::span::Record<'_>) {
let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(span)) {
Some(t) => t,
None => return,
};
#[allow(clippy::expect_used, reason = "poisoned lock")]
let mut shard_lock = self.in_flight[shard]
.slab
.lock()
.expect("lock must not be poisoned");
if let Some(rec) = shard_lock.get_mut(slab_idx) {
values.record(&mut FieldVisitor {
fields: &mut rec.fields,
});
}
}
fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
fn event(&self, event: &tracing::Event<'_>) {
let parent_actual_id = match event.parent().map(id_to_u64) {
Some(tracing_id) => {
if tracing_id == DISABLED {
log::debug!("event dropped: parent span is disabled");
return;
}
match self.decode_tracing_id(tracing_id) {
Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
None => return,
}
}
None => match stack_top() {
Some(top) if top.tracing_id == DISABLED => {
log::debug!("event dropped: parent span is disabled");
return;
}
Some(top) => top.actual_id,
None => {
log::debug!("event dropped: no active span");
return;
}
},
};
let mut record = self.event_pool.acquire();
record.metadata = Some(event.metadata());
record.recorded_at = Some(Instant::now());
record.fields.clear();
event.record(&mut FieldVisitor {
fields: &mut record.fields,
});
if pending_push_event(EventMessage {
parent_actual_id,
record,
}) >= self.pending_batch
{
self.flush_pending();
}
}
fn enter(&self, span: &tracing::span::Id) {
let tracing_id = id_to_u64(span);
let actual_id = match self.decode_tracing_id(tracing_id) {
Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
None => 0, };
stack_push(StackedSpan {
tracing_id,
actual_id,
});
}
fn exit(&self, _span: &tracing::span::Id) {
stack_pop();
}
fn try_close(&self, id: tracing::span::Id) -> bool {
let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(&id)) {
Some(t) => t,
None => return false,
};
#[allow(clippy::expect_used, reason = "poisoned lock")]
let record = self.in_flight[shard]
.slab
.lock()
.expect("lock must not be poisoned")
.try_remove(slab_idx);
if let Some(mut record) = record {
record.closed_at = Some(Instant::now());
if pending_push_span(record) >= self.pending_batch {
self.flush_pending();
}
true
} else {
false
}
}
}