tracing_cache/cache.rs
1//! The `SpanCache` subscriber and its in-flight slab shards.
2//!
3//! Open spans live in `Box<[ShardLane]>`, each lane a `Mutex<Slab>` plus
4//! a parallel sidecar of `actual_id`s readable without locking. When a
5//! span closes, its `SpanRecord` is moved to the per-thread `PENDING`
6//! buffer (`tls::pending_push`) and eventually flushed to the spillway
7//! channel that `Driver` consumes.
8//!
9//! Closed spans are streamed to consumers via [`SpanCache::subscribe`]
10//! — the driver fans each freshly-committed `SpanRecord` out to every
11//! live subscriber and then drops it. The cache holds no historical
12//! state, so consumers that need history must subscribe before the
13//! spans of interest are emitted. Subscribers that have dropped their
14//! receiver are pruned lazily on the next send.
15
16use std::sync::atomic::{AtomicU64, Ordering, Ordering::Relaxed};
17use std::sync::{Arc, Mutex};
18use std::time::Instant;
19
20use slab::Slab;
21use tracing::metadata::LevelFilter;
22use tracing::{Level, Metadata};
23
24use crate::config::CacheConfig;
25use crate::driver::{Driver, EventMessage};
26use crate::id_encoding::{DISABLED, SLAB_OFFSET, disabled_id, id_to_u64, u64_to_id};
27use crate::object_pool::ObjectPool;
28use crate::predicate::{EnabledPredicate, Interest, LevelPredicate};
29use crate::record::{EventRecord, FieldList, FieldVisitor, SpanRecord};
30use crate::thread_state::{
31 ID_BATCH, ID_CURSOR, StackedSpan, THREAD_SENDERS, ThreadSenders, ensure_thread_shard_key,
32 pending_drain_events, pending_drain_spans, pending_push_event, pending_push_span, stack_pop,
33 stack_push, stack_top,
34};
35
36/// One slab shard plus a parallel sidecar of `actual_id`s that's readable
37/// without locking the slab. The sidecar is sized to `shard_capacity`
38/// (the slab's growth bound) and indexed by `slab_idx`. `new_span`
39/// writes the new span's `actual_id` with `Release` ordering before
40/// publishing the resulting tracing id; `enter` reads with `Acquire` to
41/// see that value.
42pub(crate) struct ShardLane {
43 pub(crate) slab: Mutex<Slab<SpanRecord>>,
44 pub(crate) actual_ids: Box<[AtomicU64]>,
45}
46
47/// A `tracing::Subscriber` that funnels closed spans to live consumers.
48///
49/// Open spans live in a sharded `Box<[Mutex<Slab<SpanRecord>>]>`, with
50/// the lane count set by [`CacheConfig::lane_count`] (default
51/// [`crate::DEFAULT_LANE_COUNT`]). The shard is picked by a thread-local
52/// key; the slab gives an O(1) cache-friendly index, and the user-facing
53/// `tracing::span::Id` packs `(shard, slab_idx+2)` into a single u64 so
54/// `SPAN_STACK` push/pop and trait-method dispatch don't need a separate
55/// lookup. When a span closes it moves to a per-thread buffer and is
56/// flushed to the [`Driver`] via a spillway channel.
57///
58/// The cache itself holds no closed spans — the driver fans each one
59/// out to [`SpanCache::subscribe`] receivers and then drops it. Late
60/// events (an event whose parent span has already been fanned out)
61/// have nowhere to land and are dropped.
62///
63/// `SpanRecord.id` is an `actual_id` (separate from the tracing id),
64/// monotonic within a thread's `ID_BATCH`-sized reservation; across
65/// threads they interleave, so subscriber-observed order is
66/// driver-commit (close-time) order, not strict global open order.
67///
68/// Create with [`SpanCache::new`] / [`SpanCache::with_predicate`]
69/// (defaults) or [`SpanCache::with_config`] /
70/// [`SpanCache::with_predicate_and_config`] (custom batch sizes & lane
71/// count). Each returns `(SpanCache, Driver)`; spawn the [`Driver`] as
72/// a background task to fan closed spans out to subscribers.
73pub struct SpanCache<P: EnabledPredicate = LevelPredicate> {
74 pub(crate) in_flight: Box<[ShardLane]>,
75 /// High-water mark for the `actual_id` space — the `SpanRecord.id`
76 /// stored on every record and referenced via `parent_id`, disjoint
77 /// from the encoded tracing id space. Threads claim `ID_BATCH`-
78 /// sized slices via `fetch_add` and hand IDs out from a thread-
79 /// local reservation, so this counter is touched roughly once per
80 /// `ID_BATCH` spans rather than once per span.
81 pub(crate) id_high_water: AtomicU64,
82 pub(crate) predicate: P,
83 /// Per-shard capacity. Total open-span budget is
84 /// `shard_capacity * lane_count`.
85 pub(crate) shard_capacity: usize,
86 pub(crate) span_sender: spillway::Sender<SpanRecord>,
87 pub(crate) event_sender: spillway::Sender<EventMessage>,
88 pub(crate) pending_batch: usize,
89 pub(crate) shard_mask: u64, // lane_count - 1
90 pub(crate) shard_shift: u32, // 64 - log2(lane_count); shard at top of id
91 /// Shared pool of pre-allocated `EventRecord`s. `event()` acquires
92 /// from the per-thread shard, fills the record, and pushes the
93 /// `ReuseRef` onto the parent span's events vec. When the
94 /// `SpanRecord` finally drops (after fan-out), each `ReuseRef`
95 /// returns its allocation to the pool — amortising away the
96 /// per-event `Box::new` cost.
97 pub(crate) event_pool: Arc<ObjectPool<EventRecord>>,
98 /// Live consumers of closed spans. [`SpanCache::subscribe`] pushes a
99 /// fresh `Sender` here and hands the matching `Receiver` to the
100 /// caller; the [`Driver`] holds an `Arc` clone of this `Mutex` and
101 /// fans out each committed `SpanRecord` to every entry. Senders
102 /// that return `Error::Closed` (receiver dropped) are removed by
103 /// the driver on the next fan-out.
104 pub(crate) subscribers: Arc<Mutex<Vec<spillway::Sender<SpanRecord>>>>,
105}
106
107impl SpanCache<LevelPredicate> {
108 /// Default predicate (TRACE), default config.
109 pub fn new(capacity: usize) -> (Self, Driver) {
110 Self::with_predicate(capacity, LevelPredicate::new(Level::TRACE))
111 }
112
113 /// Default predicate (TRACE) with a custom [`CacheConfig`].
114 pub fn with_config(capacity: usize, config: CacheConfig) -> (Self, Driver) {
115 Self::with_predicate_and_config(capacity, LevelPredicate::new(Level::TRACE), config)
116 }
117}
118
119impl<P: EnabledPredicate> SpanCache<P> {
120 /// Custom predicate, default [`CacheConfig`].
121 pub fn with_predicate(capacity: usize, predicate: P) -> (Self, Driver) {
122 Self::with_predicate_and_config(capacity, predicate, CacheConfig::default())
123 }
124
125 /// Custom predicate and custom [`CacheConfig`].
126 pub fn with_predicate_and_config(
127 capacity: usize,
128 predicate: P,
129 config: CacheConfig,
130 ) -> (Self, Driver) {
131 // Silently clamp to [1, 256] and round up to the next power of two.
132 let lane_count = config.lane_count.clamp(1, 256).next_power_of_two();
133 let shard_bits = lane_count.trailing_zeros();
134 let shard_mask = (lane_count as u64) - 1;
135 // Reserve at least one bit at the top so `(shard as u64) << shift`
136 // is well-defined even when lane_count == 1.
137 let shard_shift = 64 - shard_bits.max(1);
138
139 // Bound both channels so a faster producer than consumer (e.g. a
140 // 16-core Graviton with 4 async workers vs. one driver task) can't
141 // grow spillway's internal buffers without bound and exhaust RAM.
142 // `send_many` rejects the whole batch with `Error::Full` when the
143 // limit is exceeded; `flush_pending` discards the rejected drain.
144 // Concurrency matches `lane_count` so each lane's threads tend to
145 // land on their own chute and contend less with peers (spillway's
146 // chute count caps useful per-clone parallelism).
147 let (span_sender, span_receiver) =
148 spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
149 let (event_sender, event_receiver) =
150 spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
151 let shard_capacity = capacity.div_ceil(lane_count);
152 let in_flight: Box<[ShardLane]> = (0..lane_count)
153 .map(|_| ShardLane {
154 slab: Mutex::new(Slab::with_capacity(shard_capacity)),
155 actual_ids: (0..shard_capacity)
156 .map(|_| AtomicU64::new(0))
157 .collect::<Vec<_>>()
158 .into_boxed_slice(),
159 })
160 .collect::<Vec<_>>()
161 .into_boxed_slice();
162
163 // Event pool sharded the same way as the slab (one per lane);
164 // per-shard capacity is generous so steady-state event traffic
165 // never spills. These are tiny structs (~360 B with the
166 // inline-8 FieldList), 256 × 16 lanes = ~1.5 MB worst case.
167 let event_pool = ObjectPool::<EventRecord>::new(lane_count, 256);
168
169 let subscribers = Arc::new(Mutex::new(Vec::new()));
170
171 let cache = SpanCache {
172 in_flight,
173 // Initialise to ID_BATCH so every `fetch_add(ID_BATCH)`
174 // returns a batch-aligned start; that's what makes the
175 // mask-based "cursor at boundary ⇒ refill" check work in
176 // `allocate_actual_id`.
177 id_high_water: AtomicU64::new(ID_BATCH),
178 predicate,
179 shard_capacity,
180 span_sender,
181 event_sender,
182 pending_batch: config.pending_batch,
183 shard_mask,
184 shard_shift,
185 event_pool,
186 subscribers: Arc::clone(&subscribers),
187 };
188 let driver = Driver {
189 span_receiver,
190 event_receiver,
191 capacity,
192 side_events: std::collections::BTreeMap::new(),
193 subscribers,
194 };
195 (cache, driver)
196 }
197
198 /// Number of in-flight slab shards this cache uses.
199 pub fn lane_count(&self) -> usize {
200 self.in_flight.len()
201 }
202
203 #[inline]
204 pub(crate) fn pick_shard(&self) -> usize {
205 (ensure_thread_shard_key() & self.shard_mask) as usize
206 }
207
208 /// Claim the next `actual_id` from this thread's reservation against
209 /// `id_high_water`, refilling via `fetch_add(ID_BATCH)` when the
210 /// reservation is exhausted (`cursor & (ID_BATCH - 1) == 0`).
211 /// Touches the shared atomic ~once per `ID_BATCH` spans rather than
212 /// once per span.
213 #[inline]
214 pub(crate) fn allocate_actual_id(&self) -> u64 {
215 ID_CURSOR.with(|cell| {
216 let cursor = cell.get();
217 if (cursor & (ID_BATCH - 1)) != 0 {
218 cell.set(cursor + 1);
219 cursor
220 } else {
221 let start = self.id_high_water.fetch_add(ID_BATCH, Relaxed);
222 cell.set(start + 1);
223 start
224 }
225 })
226 }
227
228 #[inline]
229 pub(crate) fn encode_tracing_id(&self, shard: usize, slab_idx: usize) -> u64 {
230 ((shard as u64) << self.shard_shift) | ((slab_idx as u64) + SLAB_OFFSET)
231 }
232
233 /// Decode a tracing id into `(shard, slab_idx)`. Returns `None` for
234 /// `DISABLED` or anything outside the encoding scheme.
235 #[inline]
236 pub(crate) fn decode_tracing_id(&self, id: u64) -> Option<(usize, usize)> {
237 if id == DISABLED {
238 return None;
239 }
240 let slab_mask = (1u64 << self.shard_shift) - 1;
241 let raw = id & slab_mask;
242 if raw < SLAB_OFFSET {
243 return None;
244 }
245 let shard = ((id >> self.shard_shift) & self.shard_mask) as usize;
246 Some((shard, (raw - SLAB_OFFSET) as usize))
247 }
248
249 /// Read a slab slot's `actual_id` from the lock-free sidecar.
250 /// `Acquire` pairs with the `Release` store in `new_span` so the
251 /// value is visible once the encoded tracing id has been published
252 /// to the caller.
253 #[inline]
254 pub(crate) fn load_actual_id(&self, shard: usize, slab_idx: usize) -> u64 {
255 self.in_flight[shard].actual_ids[slab_idx].load(Ordering::Acquire)
256 }
257
258 /// Resolve the `actual_id` (i.e. the [`SpanRecord::id`] published
259 /// on the fan-out stream) for an in-flight span addressed by
260 /// its `tracing::span::Id` u64. Lock-free `Acquire` load from the
261 /// per-shard sidecar — does not touch the slab `Mutex`.
262 pub fn actual_id_for(&self, tracing_id: u64) -> Option<u64> {
263 let (shard, slab_idx) = self.decode_tracing_id(tracing_id)?;
264 Some(self.load_actual_id(shard, slab_idx))
265 }
266
267 /// Register a new subscriber. The returned `Receiver` yields
268 /// every closed span the cache produces from this call onward,
269 /// in the driver's commit (close-time) order. The cache holds
270 /// no history — if you need to see spans from before the call,
271 /// subscribe earlier.
272 ///
273 /// Replaces the previous `page(after_id, _)` API, which keyed on
274 /// open-order `actual_id` and so silently dropped spans whenever
275 /// close order diverged from open order — the norm under async
276 /// workloads.
277 ///
278 /// `capacity` is the soft cap on in-flight spans for this
279 /// subscriber. When the receiver falls behind by that much, the
280 /// driver logs and drops a whole batch — slow consumers don't back
281 /// up the pipeline. Drop the receiver to unsubscribe; the driver
282 /// prunes the sender lazily on the next fan-out.
283 pub fn subscribe(&self, capacity: u64) -> spillway::Receiver<SpanRecord> {
284 // concurrency = 1: the driver is the only producer.
285 let (sender, receiver) = spillway::channel_with_capacity_and_concurrency(capacity, 1);
286 #[allow(clippy::expect_used, reason = "poisoned lock")]
287 self.subscribers
288 .lock()
289 .expect("lock must not be poisoned")
290 .push(sender);
291 receiver
292 }
293
294 /// Drains the calling thread's two PENDING buffers (spans + events)
295 /// into their respective spillway channels. Must be called before
296 /// [`Driver::drain_sync`] in tests to ensure all recently-closed
297 /// spans and emitted events are delivered.
298 pub fn flush_pending(&self) {
299 THREAD_SENDERS.with(|sc| {
300 // SAFETY: cell is thread-local and we only hold the &mut for
301 // the duration of this closure; nothing inside re-enters
302 // THREAD_SENDERS.
303 let slot = unsafe { &mut *sc.get() };
304 let cache_addr = self as *const _ as usize;
305 let needs_init = !matches!(slot, Some(t) if t.cache_addr == cache_addr);
306 if needs_init {
307 *slot = Some(ThreadSenders {
308 cache_addr,
309 span: self.span_sender.clone(),
310 event: self.event_sender.clone(),
311 });
312 }
313 // SAFETY: `slot` was just guaranteed to be `Some`.
314 let senders = unsafe { slot.as_ref().unwrap_unchecked() };
315 // Avoid send_many on empty drains — spillway's chute
316 // invariant rejects sender clones that publish without
317 // having ever held content. On `Error::Full`, the rejected
318 // drain is bound to the match arm and dropped, which drops
319 // each unsent record (and runs `ReuseRef::Drop` for events,
320 // returning the EventRecord allocation to the pool).
321 pending_drain_events(|events| {
322 if events.len() > 0
323 && let Err(spillway::Error::Full(_dropped)) = senders.event.send_many(events)
324 {
325 log::debug!("event channel full; dropping a batch — driver is behind");
326 }
327 });
328 pending_drain_spans(|spans| {
329 if spans.len() > 0
330 && let Err(spillway::Error::Full(_dropped)) = senders.span.send_many(spans)
331 {
332 log::debug!("span channel full; dropping a batch — driver is behind");
333 }
334 });
335 });
336 }
337}
338
339// ── Subscriber impl ──────────────────────────────────────────────────────────
340
341impl<P: EnabledPredicate> tracing::Subscriber for SpanCache<P> {
342 fn max_level_hint(&self) -> Option<LevelFilter> {
343 self.predicate.max_level_hint()
344 }
345
346 fn register_callsite(
347 &self,
348 metadata: &'static Metadata<'static>,
349 ) -> tracing::subscriber::Interest {
350 match self.predicate.callsite_enabled(metadata) {
351 Interest::Never => tracing::subscriber::Interest::never(),
352 Interest::Sometimes => tracing::subscriber::Interest::sometimes(),
353 Interest::Always => tracing::subscriber::Interest::always(),
354 }
355 }
356
357 fn enabled(&self, metadata: &Metadata<'_>) -> bool {
358 if matches!(stack_top(), Some(s) if s.tracing_id == DISABLED) {
359 return false;
360 }
361 self.predicate.enabled(metadata)
362 }
363
364 fn event_enabled(&self, event: &tracing::Event<'_>) -> bool {
365 self.predicate.enabled(event.metadata())
366 }
367
368 fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
369 // Step A: resolve parent's actual_id from a side-channel — no
370 // slab lock. Contextual: the parent's `actual_id` is right there
371 // on the SPAN_STACK entry. Explicit: the parent's tracing id
372 // encodes its slab address, and the sidecar holds its actual_id.
373 let parent_actual_id: Option<u64> = if attrs.is_contextual() {
374 match stack_top() {
375 None => return disabled_id(),
376 Some(top) if top.tracing_id == DISABLED => return disabled_id(),
377 Some(top) => Some(top.actual_id),
378 }
379 } else if attrs.is_root() {
380 if stack_top().is_some() {
381 log::warn!("root span created with an active span on the stack; disabling");
382 return disabled_id();
383 }
384 None
385 } else {
386 // `attrs.parent()` is `Some` in this arm: the `else`
387 // branch is reached after `!is_contextual() && !is_root()`,
388 // which leaves only the "explicit parent" case in
389 // tracing's span-attributes model.
390 let Some(parent) = attrs.parent() else {
391 return disabled_id();
392 };
393 let explicit = id_to_u64(parent);
394 match self.decode_tracing_id(explicit) {
395 // Lock-free read from the sidecar: the parent's actual_id
396 // was published by a `Release` store before the parent's
397 // tracing id was returned to the caller, so this `Acquire`
398 // load sees it.
399 Some((p_shard, p_slab)) => Some(self.load_actual_id(p_shard, p_slab)),
400 None => return disabled_id(),
401 }
402 };
403
404 // Step B: predicate check.
405 if !self.predicate.new_span_enabled(attrs) {
406 return disabled_id();
407 }
408
409 // Step C: build record outside the lock so field-visitor work
410 // doesn't extend the critical section.
411 let actual_id = self.allocate_actual_id();
412 let mut record = SpanRecord {
413 id: actual_id,
414 parent_id: parent_actual_id,
415 metadata: attrs.metadata(),
416 fields: FieldList::new(),
417 events: Vec::new(),
418 opened_at: Instant::now(),
419 closed_at: None,
420 };
421 attrs.record(&mut FieldVisitor {
422 fields: &mut record.fields,
423 });
424
425 // Step D: pick our shard, capacity-check + slab.insert under the
426 // Mutex, then drop the guard before the sidecar store and the
427 // pure-arithmetic id encoding. The sidecar is a separate atomic
428 // and the Release/Acquire pair on `actual_ids[slab_idx]` is its
429 // own happens-before — nobody can observe this slot until we
430 // return the tracing id below, so the store doesn't need to be
431 // sequenced under the slab Mutex.
432 let shard = self.pick_shard();
433 let lane = &self.in_flight[shard];
434 let slab_idx = {
435 #[allow(clippy::expect_used, reason = "poisoned lock")]
436 let mut slab = lane.slab.lock().expect("lock must not be poisoned");
437 if slab.len() >= self.shard_capacity {
438 log::warn!(
439 "span shard {shard} full; new span disabled. \
440 Increase capacity or reduce span rate."
441 );
442 return disabled_id();
443 }
444 slab.insert(record)
445 };
446 // The sidecar is sized to `shard_capacity` and indexed by
447 // slab_idx, which is bounded by capacity per the check above.
448 lane.actual_ids[slab_idx].store(actual_id, Ordering::Release);
449 u64_to_id(self.encode_tracing_id(shard, slab_idx))
450 }
451
452 fn record(&self, span: &tracing::span::Id, values: &tracing::span::Record<'_>) {
453 let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(span)) {
454 Some(t) => t,
455 None => return,
456 };
457 #[allow(clippy::expect_used, reason = "poisoned lock")]
458 let mut shard_lock = self.in_flight[shard]
459 .slab
460 .lock()
461 .expect("lock must not be poisoned");
462 if let Some(rec) = shard_lock.get_mut(slab_idx) {
463 values.record(&mut FieldVisitor {
464 fields: &mut rec.fields,
465 });
466 }
467 }
468
469 fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
470
471 fn event(&self, event: &tracing::Event<'_>) {
472 // Resolve the parent's `actual_id` lock-free. Contextual events
473 // get it straight off the SPAN_STACK entry; events with an
474 // explicit parent decode the tracing id and `Acquire`-load from
475 // the per-shard sidecar — no slab lock either way.
476 let parent_actual_id = match event.parent().map(id_to_u64) {
477 Some(tracing_id) => {
478 if tracing_id == DISABLED {
479 log::debug!("event dropped: parent span is disabled");
480 return;
481 }
482 match self.decode_tracing_id(tracing_id) {
483 Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
484 None => return,
485 }
486 }
487 None => match stack_top() {
488 Some(top) if top.tracing_id == DISABLED => {
489 log::debug!("event dropped: parent span is disabled");
490 return;
491 }
492 Some(top) => top.actual_id,
493 None => {
494 log::debug!("event dropped: no active span");
495 return;
496 }
497 },
498 };
499
500 // Acquire a pooled EventRecord, fill it in place. The pooled
501 // FieldList allocation is preserved across reuse.
502 let mut record = self.event_pool.acquire();
503 record.metadata = Some(event.metadata());
504 record.recorded_at = Some(Instant::now());
505 record.fields.clear();
506 event.record(&mut FieldVisitor {
507 fields: &mut record.fields,
508 });
509
510 // Hand off to the driver via the event PENDING — no slab lock
511 // here. The driver attaches to the parent's `events` vec
512 // (directly if the parent's already in the map, or via the
513 // side buffer if the event raced ahead of the span).
514 if pending_push_event(EventMessage {
515 parent_actual_id,
516 record,
517 }) >= self.pending_batch
518 {
519 self.flush_pending();
520 }
521 }
522
523 fn enter(&self, span: &tracing::span::Id) {
524 // Resolve actual_id once, lock-free, and stash it on the stack so
525 // a contextual `new_span` underneath can read its parent's
526 // actual_id without locking the parent's slab.
527 let tracing_id = id_to_u64(span);
528 let actual_id = match self.decode_tracing_id(tracing_id) {
529 Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
530 None => 0, // DISABLED entry — actual_id is never read.
531 };
532 stack_push(StackedSpan {
533 tracing_id,
534 actual_id,
535 });
536 }
537
538 fn exit(&self, _span: &tracing::span::Id) {
539 stack_pop();
540 }
541
542 fn try_close(&self, id: tracing::span::Id) -> bool {
543 let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(&id)) {
544 Some(t) => t,
545 None => return false,
546 };
547
548 // Single slab lookup via `try_remove` (no contains-then-remove
549 // double hash), and `Instant::now()` lives outside the critical
550 // section — only paid on the success path.
551 #[allow(clippy::expect_used, reason = "poisoned lock")]
552 let record = self.in_flight[shard]
553 .slab
554 .lock()
555 .expect("lock must not be poisoned")
556 .try_remove(slab_idx);
557
558 if let Some(mut record) = record {
559 record.closed_at = Some(Instant::now());
560 if pending_push_span(record) >= self.pending_batch {
561 self.flush_pending();
562 }
563 true
564 } else {
565 false
566 }
567 }
568}