tracing_cache/cache.rs
1//! The `SpanCache` subscriber and its in-flight slab shards.
2//!
3//! Open spans live in `Box<[ShardLane]>`, each lane a `Mutex<Slab>` plus
4//! a parallel sidecar of `actual_id`s readable without locking. When a
5//! span closes, its `SpanRecord` is moved to the per-thread `PENDING`
6//! buffer (`tls::pending_push`) and eventually flushed to the spillway
7//! channel that `Driver` consumes.
8
9use std::collections::BTreeMap;
10use std::ops::Bound::{Excluded, Unbounded};
11use std::sync::atomic::{AtomicU64, Ordering, Ordering::Relaxed};
12use std::sync::{Arc, Mutex, RwLock};
13use std::time::Instant;
14
15use slab::Slab;
16use tracing::metadata::LevelFilter;
17use tracing::{Level, Metadata};
18
19use crate::config::CacheConfig;
20use crate::driver::{Driver, EventMessage};
21use crate::id_encoding::{DISABLED, SLAB_OFFSET, disabled_id, id_to_u64, u64_to_id};
22use crate::object_pool::ObjectPool;
23use crate::predicate::{EnabledPredicate, Interest, LevelPredicate};
24use crate::record::{EventRecord, FieldList, FieldVisitor, SpanRecord};
25use crate::thread_state::{
26 ID_BATCH, ID_CURSOR, StackedSpan, THREAD_SENDERS, ThreadSenders, ensure_thread_shard_key,
27 pending_drain_events, pending_drain_spans, pending_push_event, pending_push_span, stack_pop,
28 stack_push, stack_top,
29};
30
31/// One slab shard plus a parallel sidecar of `actual_id`s that's readable
32/// without locking the slab. The sidecar is sized to `shard_capacity`
33/// (the slab's growth bound) and indexed by `slab_idx`. `new_span`
34/// writes the new span's `actual_id` with `Release` ordering before
35/// publishing the resulting tracing id; `enter` reads with `Acquire` to
36/// see that value.
37pub(crate) struct ShardLane {
38 pub(crate) slab: Mutex<Slab<SpanRecord>>,
39 pub(crate) actual_ids: Box<[AtomicU64]>,
40}
41
42/// A `tracing::Subscriber` that holds spans in memory for inspection.
43///
44/// Open spans live in a sharded `Box<[Mutex<Slab<SpanRecord>>]>`, with
45/// the lane count set by [`CacheConfig::lane_count`] (default
46/// [`crate::DEFAULT_LANE_COUNT`]). The shard is picked by a thread-local
47/// key; the slab gives an O(1) cache-friendly index, and the user-facing
48/// `tracing::span::Id` packs `(shard, slab_idx+2)` into a single u64 so
49/// `SPAN_STACK` push/pop and trait-method dispatch don't need a separate
50/// lookup. When a span closes it moves to a per-thread buffer and is
51/// flushed to the [`Driver`] via a spillway channel.
52///
53/// `SpanRecord.id` is an `actual_id` (separate from the tracing id) that
54/// serves as the `BTreeMap` key, since slab indices are reused. IDs are
55/// monotonic within a thread's `ID_BATCH`-sized reservation; across
56/// threads they interleave, so map order reflects allocation order
57/// per-thread but not strict global creation order.
58///
59/// Create with [`SpanCache::new`] / [`SpanCache::with_predicate`]
60/// (defaults) or [`SpanCache::with_config`] /
61/// [`SpanCache::with_predicate_and_config`] (custom batch sizes & lane
62/// count). Each returns `(SpanCache, Driver)`; spawn the [`Driver`] as
63/// a background task to commit closed spans.
64pub struct SpanCache<P: EnabledPredicate = LevelPredicate> {
65 pub(crate) in_flight: Box<[ShardLane]>,
66 pub(crate) map: Arc<RwLock<BTreeMap<u64, SpanRecord>>>,
67 /// High-water mark for the `actual_id` space (the `SpanRecord.id` /
68 /// `BTreeMap` key, disjoint from the encoded tracing id space).
69 /// Threads claim `ID_BATCH`-sized slices via `fetch_add` and hand IDs
70 /// out from a thread-local reservation, so this counter is touched
71 /// roughly once per `ID_BATCH` spans rather than once per span.
72 pub(crate) id_high_water: AtomicU64,
73 pub(crate) predicate: P,
74 /// Per-shard capacity. Total open-span budget is
75 /// `shard_capacity * lane_count`.
76 pub(crate) shard_capacity: usize,
77 pub(crate) span_sender: spillway::Sender<SpanRecord>,
78 pub(crate) event_sender: spillway::Sender<EventMessage>,
79 pub(crate) pending_batch: usize,
80 pub(crate) shard_mask: u64, // lane_count - 1
81 pub(crate) shard_shift: u32, // 64 - log2(lane_count); shard at top of id
82 /// Shared pool of pre-allocated `EventRecord`s. `event()` acquires
83 /// from the per-thread shard, fills the record, and pushes the
84 /// `ReuseRef` onto the parent span's events vec. When the
85 /// `SpanRecord` finally drops (BTreeMap eviction), each `ReuseRef`
86 /// returns its allocation to the pool — amortising away the
87 /// per-event `Box::new` cost the previous `Vec<EventRecord>` paid.
88 pub(crate) event_pool: Arc<ObjectPool<EventRecord>>,
89}
90
91impl SpanCache<LevelPredicate> {
92 /// Default predicate (TRACE), default config.
93 pub fn new(capacity: usize) -> (Self, Driver) {
94 Self::with_predicate(capacity, LevelPredicate::new(Level::TRACE))
95 }
96
97 /// Default predicate (TRACE) with a custom [`CacheConfig`].
98 pub fn with_config(capacity: usize, config: CacheConfig) -> (Self, Driver) {
99 Self::with_predicate_and_config(capacity, LevelPredicate::new(Level::TRACE), config)
100 }
101}
102
103impl<P: EnabledPredicate> SpanCache<P> {
104 /// Custom predicate, default [`CacheConfig`].
105 pub fn with_predicate(capacity: usize, predicate: P) -> (Self, Driver) {
106 Self::with_predicate_and_config(capacity, predicate, CacheConfig::default())
107 }
108
109 /// Custom predicate and custom [`CacheConfig`].
110 pub fn with_predicate_and_config(
111 capacity: usize,
112 predicate: P,
113 config: CacheConfig,
114 ) -> (Self, Driver) {
115 // Silently clamp to [1, 256] and round up to the next power of two.
116 let lane_count = config.lane_count.clamp(1, 256).next_power_of_two();
117 let shard_bits = lane_count.trailing_zeros();
118 let shard_mask = (lane_count as u64) - 1;
119 // Reserve at least one bit at the top so `(shard as u64) << shift`
120 // is well-defined even when lane_count == 1.
121 let shard_shift = 64 - shard_bits.max(1);
122
123 // Bound both channels so a faster producer than consumer (e.g. a
124 // 16-core Graviton with 4 async workers vs. one driver task) can't
125 // grow spillway's internal buffers without bound and exhaust RAM.
126 // `send_many` rejects the whole batch with `Error::Full` when the
127 // limit is exceeded; `flush_pending` discards the rejected drain.
128 // Concurrency matches `lane_count` so each lane's threads tend to
129 // land on their own chute and contend less with peers (spillway's
130 // chute count caps useful per-clone parallelism).
131 let (span_sender, span_receiver) =
132 spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
133 let (event_sender, event_receiver) =
134 spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
135 let map = Arc::new(RwLock::new(BTreeMap::new()));
136 let shard_capacity = capacity.div_ceil(lane_count);
137 let in_flight: Box<[ShardLane]> = (0..lane_count)
138 .map(|_| ShardLane {
139 slab: Mutex::new(Slab::with_capacity(shard_capacity)),
140 actual_ids: (0..shard_capacity)
141 .map(|_| AtomicU64::new(0))
142 .collect::<Vec<_>>()
143 .into_boxed_slice(),
144 })
145 .collect::<Vec<_>>()
146 .into_boxed_slice();
147
148 // Event pool sharded the same way as the slab (one per lane);
149 // per-shard capacity is generous so steady-state event traffic
150 // never spills. These are tiny structs (~360 B with the
151 // inline-8 FieldList), 256 × 16 lanes = ~1.5 MB worst case.
152 let event_pool = ObjectPool::<EventRecord>::new(lane_count, 256);
153
154 let cache = SpanCache {
155 in_flight,
156 map: Arc::clone(&map),
157 // Initialise to ID_BATCH so every `fetch_add(ID_BATCH)`
158 // returns a batch-aligned start; that's what makes the
159 // mask-based "cursor at boundary ⇒ refill" check work in
160 // `allocate_actual_id`.
161 id_high_water: AtomicU64::new(ID_BATCH),
162 predicate,
163 shard_capacity,
164 span_sender,
165 event_sender,
166 pending_batch: config.pending_batch,
167 shard_mask,
168 shard_shift,
169 event_pool,
170 };
171 let driver = Driver {
172 map,
173 span_receiver,
174 event_receiver,
175 capacity,
176 side_events: std::collections::BTreeMap::new(),
177 };
178 (cache, driver)
179 }
180
181 /// Number of in-flight slab shards this cache uses.
182 pub fn lane_count(&self) -> usize {
183 self.in_flight.len()
184 }
185
186 #[inline]
187 pub(crate) fn pick_shard(&self) -> usize {
188 (ensure_thread_shard_key() & self.shard_mask) as usize
189 }
190
191 /// Claim the next `actual_id` from this thread's reservation against
192 /// `id_high_water`, refilling via `fetch_add(ID_BATCH)` when the
193 /// reservation is exhausted (`cursor & (ID_BATCH - 1) == 0`).
194 /// Touches the shared atomic ~once per `ID_BATCH` spans rather than
195 /// once per span.
196 #[inline]
197 pub(crate) fn allocate_actual_id(&self) -> u64 {
198 ID_CURSOR.with(|cell| {
199 let cursor = cell.get();
200 if (cursor & (ID_BATCH - 1)) != 0 {
201 cell.set(cursor + 1);
202 cursor
203 } else {
204 let start = self.id_high_water.fetch_add(ID_BATCH, Relaxed);
205 cell.set(start + 1);
206 start
207 }
208 })
209 }
210
211 #[inline]
212 pub(crate) fn encode_tracing_id(&self, shard: usize, slab_idx: usize) -> u64 {
213 ((shard as u64) << self.shard_shift) | ((slab_idx as u64) + SLAB_OFFSET)
214 }
215
216 /// Decode a tracing id into `(shard, slab_idx)`. Returns `None` for
217 /// `DISABLED` or anything outside the encoding scheme.
218 #[inline]
219 pub(crate) fn decode_tracing_id(&self, id: u64) -> Option<(usize, usize)> {
220 if id == DISABLED {
221 return None;
222 }
223 let slab_mask = (1u64 << self.shard_shift) - 1;
224 let raw = id & slab_mask;
225 if raw < SLAB_OFFSET {
226 return None;
227 }
228 let shard = ((id >> self.shard_shift) & self.shard_mask) as usize;
229 Some((shard, (raw - SLAB_OFFSET) as usize))
230 }
231
232 /// Read a slab slot's `actual_id` from the lock-free sidecar.
233 /// `Acquire` pairs with the `Release` store in `new_span` so the
234 /// value is visible once the encoded tracing id has been published
235 /// to the caller.
236 #[inline]
237 pub(crate) fn load_actual_id(&self, shard: usize, slab_idx: usize) -> u64 {
238 self.in_flight[shard].actual_ids[slab_idx].load(Ordering::Acquire)
239 }
240
241 /// Returns a closed span by its actual_id (`SpanRecord.id`). This is
242 /// the id stored in `parent_id` and used as the BTreeMap key.
243 /// Only closed spans are reachable here.
244 pub fn get_span(&self, actual_id: u64) -> Option<SpanRecord> {
245 #[allow(clippy::expect_used, reason = "poisoned lock")]
246 let map = self.map.read().expect("lock must not be poisoned");
247 map.get(&actual_id).cloned()
248 }
249
250 /// Drop every closed span currently in the BTreeMap. Called by
251 /// the host when the cache-recording level transitions to `OFF`
252 /// so a paused host doesn't keep stale data around to confuse the
253 /// next session. In-flight spans (still open in the slabs) are
254 /// not affected; if any close after this call they'll repopulate
255 /// the map as normal.
256 pub fn clear(&self) {
257 #[allow(clippy::expect_used, reason = "poisoned lock")]
258 let mut map = self.map.write().expect("lock must not be poisoned");
259 map.clear();
260 }
261
262 /// Resolve the `actual_id` (i.e. the [`SpanRecord::id`] used as the
263 /// `BTreeMap` key after close) for an in-flight span addressed by
264 /// its `tracing::span::Id` u64. Lock-free `Acquire` load from the
265 /// per-shard sidecar — does not touch the slab `Mutex`.
266 pub fn actual_id_for(&self, tracing_id: u64) -> Option<u64> {
267 let (shard, slab_idx) = self.decode_tracing_id(tracing_id)?;
268 Some(self.load_actual_id(shard, slab_idx))
269 }
270
271 /// Returns closed spans in ascending actual_id order. Open spans are
272 /// not included; call [`Self::flush_pending`] + [`Driver::drain_sync`]
273 /// first if you need just-closed spans to appear.
274 pub fn page(&self, after_id: u64, limit: usize) -> Vec<SpanRecord> {
275 #[allow(clippy::expect_used, reason = "poisoned lock")]
276 let map = self.map.read().expect("lock must not be poisoned");
277 if after_id == 0 {
278 map.values().take(limit).cloned().collect()
279 } else {
280 map.range((Excluded(after_id), Unbounded))
281 .take(limit)
282 .map(|(_, v)| v.clone())
283 .collect()
284 }
285 }
286
287 /// Drains the calling thread's two PENDING buffers (spans + events)
288 /// into their respective spillway channels. Must be called before
289 /// [`Driver::drain_sync`] in tests to ensure all recently-closed
290 /// spans and emitted events are delivered.
291 pub fn flush_pending(&self) {
292 THREAD_SENDERS.with(|sc| {
293 // SAFETY: cell is thread-local and we only hold the &mut for
294 // the duration of this closure; nothing inside re-enters
295 // THREAD_SENDERS.
296 let slot = unsafe { &mut *sc.get() };
297 let cache_addr = self as *const _ as usize;
298 let needs_init = !matches!(slot, Some(t) if t.cache_addr == cache_addr);
299 if needs_init {
300 *slot = Some(ThreadSenders {
301 cache_addr,
302 span: self.span_sender.clone(),
303 event: self.event_sender.clone(),
304 });
305 }
306 // SAFETY: `slot` was just guaranteed to be `Some`.
307 let senders = unsafe { slot.as_ref().unwrap_unchecked() };
308 // Avoid send_many on empty drains — spillway's chute
309 // invariant rejects sender clones that publish without
310 // having ever held content. On `Error::Full`, the rejected
311 // drain is bound to the match arm and dropped, which drops
312 // each unsent record (and runs `ReuseRef::Drop` for events,
313 // returning the EventRecord allocation to the pool).
314 pending_drain_events(|events| {
315 if events.len() > 0
316 && let Err(spillway::Error::Full(_dropped)) = senders.event.send_many(events)
317 {
318 log::debug!("event channel full; dropping a batch — driver is behind");
319 }
320 });
321 pending_drain_spans(|spans| {
322 if spans.len() > 0
323 && let Err(spillway::Error::Full(_dropped)) = senders.span.send_many(spans)
324 {
325 log::debug!("span channel full; dropping a batch — driver is behind");
326 }
327 });
328 });
329 }
330}
331
332// ── Subscriber impl ──────────────────────────────────────────────────────────
333
334impl<P: EnabledPredicate> tracing::Subscriber for SpanCache<P> {
335 fn max_level_hint(&self) -> Option<LevelFilter> {
336 self.predicate.max_level_hint()
337 }
338
339 fn register_callsite(
340 &self,
341 metadata: &'static Metadata<'static>,
342 ) -> tracing::subscriber::Interest {
343 match self.predicate.callsite_enabled(metadata) {
344 Interest::Never => tracing::subscriber::Interest::never(),
345 Interest::Sometimes => tracing::subscriber::Interest::sometimes(),
346 Interest::Always => tracing::subscriber::Interest::always(),
347 }
348 }
349
350 fn enabled(&self, metadata: &Metadata<'_>) -> bool {
351 if matches!(stack_top(), Some(s) if s.tracing_id == DISABLED) {
352 return false;
353 }
354 self.predicate.enabled(metadata)
355 }
356
357 fn event_enabled(&self, event: &tracing::Event<'_>) -> bool {
358 self.predicate.enabled(event.metadata())
359 }
360
361 fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
362 // Step A: resolve parent's actual_id from a side-channel — no
363 // slab lock. Contextual: the parent's `actual_id` is right there
364 // on the SPAN_STACK entry. Explicit: the parent's tracing id
365 // encodes its slab address, and the sidecar holds its actual_id.
366 let parent_actual_id: Option<u64> = if attrs.is_contextual() {
367 match stack_top() {
368 None => return disabled_id(),
369 Some(top) if top.tracing_id == DISABLED => return disabled_id(),
370 Some(top) => Some(top.actual_id),
371 }
372 } else if attrs.is_root() {
373 if stack_top().is_some() {
374 log::warn!("root span created with an active span on the stack; disabling");
375 return disabled_id();
376 }
377 None
378 } else {
379 // `attrs.parent()` is `Some` in this arm: the `else`
380 // branch is reached after `!is_contextual() && !is_root()`,
381 // which leaves only the "explicit parent" case in
382 // tracing's span-attributes model.
383 let Some(parent) = attrs.parent() else {
384 return disabled_id();
385 };
386 let explicit = id_to_u64(parent);
387 match self.decode_tracing_id(explicit) {
388 // Lock-free read from the sidecar: the parent's actual_id
389 // was published by a `Release` store before the parent's
390 // tracing id was returned to the caller, so this `Acquire`
391 // load sees it.
392 Some((p_shard, p_slab)) => Some(self.load_actual_id(p_shard, p_slab)),
393 None => return disabled_id(),
394 }
395 };
396
397 // Step B: predicate check.
398 if !self.predicate.new_span_enabled(attrs) {
399 return disabled_id();
400 }
401
402 // Step C: build record outside the lock so field-visitor work
403 // doesn't extend the critical section.
404 let actual_id = self.allocate_actual_id();
405 let mut record = SpanRecord {
406 id: actual_id,
407 parent_id: parent_actual_id,
408 metadata: attrs.metadata(),
409 fields: FieldList::new(),
410 events: Vec::new(),
411 opened_at: Instant::now(),
412 closed_at: None,
413 };
414 attrs.record(&mut FieldVisitor {
415 fields: &mut record.fields,
416 });
417
418 // Step D: pick our shard, capacity-check + slab.insert under the
419 // Mutex, then drop the guard before the sidecar store and the
420 // pure-arithmetic id encoding. The sidecar is a separate atomic
421 // and the Release/Acquire pair on `actual_ids[slab_idx]` is its
422 // own happens-before — nobody can observe this slot until we
423 // return the tracing id below, so the store doesn't need to be
424 // sequenced under the slab Mutex.
425 let shard = self.pick_shard();
426 let lane = &self.in_flight[shard];
427 let slab_idx = {
428 #[allow(clippy::expect_used, reason = "poisoned lock")]
429 let mut slab = lane.slab.lock().expect("lock must not be poisoned");
430 if slab.len() >= self.shard_capacity {
431 log::warn!(
432 "span shard {shard} full; new span disabled. \
433 Increase capacity or reduce span rate."
434 );
435 return disabled_id();
436 }
437 slab.insert(record)
438 };
439 // The sidecar is sized to `shard_capacity` and indexed by
440 // slab_idx, which is bounded by capacity per the check above.
441 lane.actual_ids[slab_idx].store(actual_id, Ordering::Release);
442 u64_to_id(self.encode_tracing_id(shard, slab_idx))
443 }
444
445 fn record(&self, span: &tracing::span::Id, values: &tracing::span::Record<'_>) {
446 let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(span)) {
447 Some(t) => t,
448 None => return,
449 };
450 #[allow(clippy::expect_used, reason = "poisoned lock")]
451 let mut shard_lock = self.in_flight[shard]
452 .slab
453 .lock()
454 .expect("lock must not be poisoned");
455 if let Some(rec) = shard_lock.get_mut(slab_idx) {
456 values.record(&mut FieldVisitor {
457 fields: &mut rec.fields,
458 });
459 }
460 }
461
462 fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
463
464 fn event(&self, event: &tracing::Event<'_>) {
465 // Resolve the parent's `actual_id` lock-free. Contextual events
466 // get it straight off the SPAN_STACK entry; events with an
467 // explicit parent decode the tracing id and `Acquire`-load from
468 // the per-shard sidecar — no slab lock either way.
469 let parent_actual_id = match event.parent().map(id_to_u64) {
470 Some(tracing_id) => {
471 if tracing_id == DISABLED {
472 log::debug!("event dropped: parent span is disabled");
473 return;
474 }
475 match self.decode_tracing_id(tracing_id) {
476 Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
477 None => return,
478 }
479 }
480 None => match stack_top() {
481 Some(top) if top.tracing_id == DISABLED => {
482 log::debug!("event dropped: parent span is disabled");
483 return;
484 }
485 Some(top) => top.actual_id,
486 None => {
487 log::debug!("event dropped: no active span");
488 return;
489 }
490 },
491 };
492
493 // Acquire a pooled EventRecord, fill it in place. The pooled
494 // FieldList allocation is preserved across reuse.
495 let mut record = self.event_pool.acquire();
496 record.metadata = Some(event.metadata());
497 record.recorded_at = Some(Instant::now());
498 record.fields.clear();
499 event.record(&mut FieldVisitor {
500 fields: &mut record.fields,
501 });
502
503 // Hand off to the driver via the event PENDING — no slab lock
504 // here. The driver attaches to the parent's `events` vec
505 // (directly if the parent's already in the map, or via the
506 // side buffer if the event raced ahead of the span).
507 if pending_push_event(EventMessage {
508 parent_actual_id,
509 record,
510 }) >= self.pending_batch
511 {
512 self.flush_pending();
513 }
514 }
515
516 fn enter(&self, span: &tracing::span::Id) {
517 // Resolve actual_id once, lock-free, and stash it on the stack so
518 // a contextual `new_span` underneath can read its parent's
519 // actual_id without locking the parent's slab.
520 let tracing_id = id_to_u64(span);
521 let actual_id = match self.decode_tracing_id(tracing_id) {
522 Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
523 None => 0, // DISABLED entry — actual_id is never read.
524 };
525 stack_push(StackedSpan {
526 tracing_id,
527 actual_id,
528 });
529 }
530
531 fn exit(&self, _span: &tracing::span::Id) {
532 stack_pop();
533 }
534
535 fn try_close(&self, id: tracing::span::Id) -> bool {
536 let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(&id)) {
537 Some(t) => t,
538 None => return false,
539 };
540
541 // Single slab lookup via `try_remove` (no contains-then-remove
542 // double hash), and `Instant::now()` lives outside the critical
543 // section — only paid on the success path.
544 #[allow(clippy::expect_used, reason = "poisoned lock")]
545 let record = self.in_flight[shard]
546 .slab
547 .lock()
548 .expect("lock must not be poisoned")
549 .try_remove(slab_idx);
550
551 if let Some(mut record) = record {
552 record.closed_at = Some(Instant::now());
553 if pending_push_span(record) >= self.pending_batch {
554 self.flush_pending();
555 }
556 true
557 } else {
558 false
559 }
560 }
561}