Skip to main content

tracing_cache/
cache.rs

1//! The `SpanCache` subscriber and its in-flight slab shards.
2//!
3//! Open spans live in `Box<[ShardLane]>`, each lane a `Mutex<Slab>` plus
4//! a parallel sidecar of `actual_id`s readable without locking.  When a
5//! span closes, its `SpanRecord` is moved to the per-thread `PENDING`
6//! buffer (`tls::pending_push`) and eventually flushed to the spillway
7//! channel that `Driver` consumes.
8
9use std::collections::BTreeMap;
10use std::ops::Bound::{Excluded, Unbounded};
11use std::sync::atomic::{AtomicU64, Ordering, Ordering::Relaxed};
12use std::sync::{Arc, Mutex, RwLock};
13use std::time::Instant;
14
15use slab::Slab;
16use tracing::metadata::LevelFilter;
17use tracing::{Level, Metadata};
18
19use crate::config::CacheConfig;
20use crate::driver::{Driver, EventMessage};
21use crate::id_encoding::{DISABLED, SLAB_OFFSET, disabled_id, id_to_u64, u64_to_id};
22use crate::object_pool::ObjectPool;
23use crate::predicate::{EnabledPredicate, Interest, LevelPredicate};
24use crate::record::{EventRecord, FieldList, FieldVisitor, SpanRecord};
25use crate::thread_state::{
26    ID_BATCH, ID_CURSOR, StackedSpan, THREAD_SENDERS, ThreadSenders, ensure_thread_shard_key,
27    pending_drain_events, pending_drain_spans, pending_push_event, pending_push_span, stack_pop,
28    stack_push, stack_top,
29};
30
31/// One slab shard plus a parallel sidecar of `actual_id`s that's readable
32/// without locking the slab.  The sidecar is sized to `shard_capacity`
33/// (the slab's growth bound) and indexed by `slab_idx`.  `new_span`
34/// writes the new span's `actual_id` with `Release` ordering before
35/// publishing the resulting tracing id; `enter` reads with `Acquire` to
36/// see that value.
37pub(crate) struct ShardLane {
38    pub(crate) slab: Mutex<Slab<SpanRecord>>,
39    pub(crate) actual_ids: Box<[AtomicU64]>,
40}
41
42/// A `tracing::Subscriber` that holds spans in memory for inspection.
43///
44/// Open spans live in a sharded `Box<[Mutex<Slab<SpanRecord>>]>`, with
45/// the lane count set by [`CacheConfig::lane_count`] (default
46/// [`crate::DEFAULT_LANE_COUNT`]).  The shard is picked by a thread-local
47/// key; the slab gives an O(1) cache-friendly index, and the user-facing
48/// `tracing::span::Id` packs `(shard, slab_idx+2)` into a single u64 so
49/// `SPAN_STACK` push/pop and trait-method dispatch don't need a separate
50/// lookup.  When a span closes it moves to a per-thread buffer and is
51/// flushed to the [`Driver`] via a spillway channel.
52///
53/// `SpanRecord.id` is an `actual_id` (separate from the tracing id) that
54/// serves as the `BTreeMap` key, since slab indices are reused.  IDs are
55/// monotonic within a thread's `ID_BATCH`-sized reservation; across
56/// threads they interleave, so map order reflects allocation order
57/// per-thread but not strict global creation order.
58///
59/// Create with [`SpanCache::new`] / [`SpanCache::with_predicate`]
60/// (defaults) or [`SpanCache::with_config`] /
61/// [`SpanCache::with_predicate_and_config`] (custom batch sizes & lane
62/// count).  Each returns `(SpanCache, Driver)`; spawn the [`Driver`] as
63/// a background task to commit closed spans.
64pub struct SpanCache<P: EnabledPredicate = LevelPredicate> {
65    pub(crate) in_flight: Box<[ShardLane]>,
66    pub(crate) map: Arc<RwLock<BTreeMap<u64, SpanRecord>>>,
67    /// High-water mark for the `actual_id` space (the `SpanRecord.id` /
68    /// `BTreeMap` key, disjoint from the encoded tracing id space).
69    /// Threads claim `ID_BATCH`-sized slices via `fetch_add` and hand IDs
70    /// out from a thread-local reservation, so this counter is touched
71    /// roughly once per `ID_BATCH` spans rather than once per span.
72    pub(crate) id_high_water: AtomicU64,
73    pub(crate) predicate: P,
74    /// Per-shard capacity.  Total open-span budget is
75    /// `shard_capacity * lane_count`.
76    pub(crate) shard_capacity: usize,
77    pub(crate) span_sender: spillway::Sender<SpanRecord>,
78    pub(crate) event_sender: spillway::Sender<EventMessage>,
79    pub(crate) pending_batch: usize,
80    pub(crate) shard_mask: u64,  // lane_count - 1
81    pub(crate) shard_shift: u32, // 64 - log2(lane_count); shard at top of id
82    /// Shared pool of pre-allocated `EventRecord`s.  `event()` acquires
83    /// from the per-thread shard, fills the record, and pushes the
84    /// `ReuseRef` onto the parent span's events vec.  When the
85    /// `SpanRecord` finally drops (BTreeMap eviction), each `ReuseRef`
86    /// returns its allocation to the pool — amortising away the
87    /// per-event `Box::new` cost the previous `Vec<EventRecord>` paid.
88    pub(crate) event_pool: Arc<ObjectPool<EventRecord>>,
89}
90
91impl SpanCache<LevelPredicate> {
92    /// Default predicate (TRACE), default config.
93    pub fn new(capacity: usize) -> (Self, Driver) {
94        Self::with_predicate(capacity, LevelPredicate::new(Level::TRACE))
95    }
96
97    /// Default predicate (TRACE) with a custom [`CacheConfig`].
98    pub fn with_config(capacity: usize, config: CacheConfig) -> (Self, Driver) {
99        Self::with_predicate_and_config(capacity, LevelPredicate::new(Level::TRACE), config)
100    }
101}
102
103impl<P: EnabledPredicate> SpanCache<P> {
104    /// Custom predicate, default [`CacheConfig`].
105    pub fn with_predicate(capacity: usize, predicate: P) -> (Self, Driver) {
106        Self::with_predicate_and_config(capacity, predicate, CacheConfig::default())
107    }
108
109    /// Custom predicate and custom [`CacheConfig`].
110    pub fn with_predicate_and_config(
111        capacity: usize,
112        predicate: P,
113        config: CacheConfig,
114    ) -> (Self, Driver) {
115        // Silently clamp to [1, 256] and round up to the next power of two.
116        let lane_count = config.lane_count.clamp(1, 256).next_power_of_two();
117        let shard_bits = lane_count.trailing_zeros();
118        let shard_mask = (lane_count as u64) - 1;
119        // Reserve at least one bit at the top so `(shard as u64) << shift`
120        // is well-defined even when lane_count == 1.
121        let shard_shift = 64 - shard_bits.max(1);
122
123        // Bound both channels so a faster producer than consumer (e.g. a
124        // 16-core Graviton with 4 async workers vs. one driver task) can't
125        // grow spillway's internal buffers without bound and exhaust RAM.
126        // `send_many` rejects the whole batch with `Error::Full` when the
127        // limit is exceeded; `flush_pending` discards the rejected drain.
128        // Concurrency matches `lane_count` so each lane's threads tend to
129        // land on their own chute and contend less with peers (spillway's
130        // chute count caps useful per-clone parallelism).
131        let (span_sender, span_receiver) =
132            spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
133        let (event_sender, event_receiver) =
134            spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
135        let map = Arc::new(RwLock::new(BTreeMap::new()));
136        let shard_capacity = capacity.div_ceil(lane_count);
137        let in_flight: Box<[ShardLane]> = (0..lane_count)
138            .map(|_| ShardLane {
139                slab: Mutex::new(Slab::with_capacity(shard_capacity)),
140                actual_ids: (0..shard_capacity)
141                    .map(|_| AtomicU64::new(0))
142                    .collect::<Vec<_>>()
143                    .into_boxed_slice(),
144            })
145            .collect::<Vec<_>>()
146            .into_boxed_slice();
147
148        // Event pool sharded the same way as the slab (one per lane);
149        // per-shard capacity is generous so steady-state event traffic
150        // never spills.  These are tiny structs (~360 B with the
151        // inline-8 FieldList), 256 × 16 lanes = ~1.5 MB worst case.
152        let event_pool = ObjectPool::<EventRecord>::new(lane_count, 256);
153
154        let cache = SpanCache {
155            in_flight,
156            map: Arc::clone(&map),
157            // Initialise to ID_BATCH so every `fetch_add(ID_BATCH)`
158            // returns a batch-aligned start; that's what makes the
159            // mask-based "cursor at boundary ⇒ refill" check work in
160            // `allocate_actual_id`.
161            id_high_water: AtomicU64::new(ID_BATCH),
162            predicate,
163            shard_capacity,
164            span_sender,
165            event_sender,
166            pending_batch: config.pending_batch,
167            shard_mask,
168            shard_shift,
169            event_pool,
170        };
171        let driver = Driver {
172            map,
173            span_receiver,
174            event_receiver,
175            capacity,
176            side_events: std::collections::BTreeMap::new(),
177        };
178        (cache, driver)
179    }
180
181    /// Number of in-flight slab shards this cache uses.
182    pub fn lane_count(&self) -> usize {
183        self.in_flight.len()
184    }
185
186    #[inline]
187    pub(crate) fn pick_shard(&self) -> usize {
188        (ensure_thread_shard_key() & self.shard_mask) as usize
189    }
190
191    /// Claim the next `actual_id` from this thread's reservation against
192    /// `id_high_water`, refilling via `fetch_add(ID_BATCH)` when the
193    /// reservation is exhausted (`cursor & (ID_BATCH - 1) == 0`).
194    /// Touches the shared atomic ~once per `ID_BATCH` spans rather than
195    /// once per span.
196    #[inline]
197    pub(crate) fn allocate_actual_id(&self) -> u64 {
198        ID_CURSOR.with(|cell| {
199            let cursor = cell.get();
200            if (cursor & (ID_BATCH - 1)) != 0 {
201                cell.set(cursor + 1);
202                cursor
203            } else {
204                let start = self.id_high_water.fetch_add(ID_BATCH, Relaxed);
205                cell.set(start + 1);
206                start
207            }
208        })
209    }
210
211    #[inline]
212    pub(crate) fn encode_tracing_id(&self, shard: usize, slab_idx: usize) -> u64 {
213        ((shard as u64) << self.shard_shift) | ((slab_idx as u64) + SLAB_OFFSET)
214    }
215
216    /// Decode a tracing id into `(shard, slab_idx)`.  Returns `None` for
217    /// `DISABLED` or anything outside the encoding scheme.
218    #[inline]
219    pub(crate) fn decode_tracing_id(&self, id: u64) -> Option<(usize, usize)> {
220        if id == DISABLED {
221            return None;
222        }
223        let slab_mask = (1u64 << self.shard_shift) - 1;
224        let raw = id & slab_mask;
225        if raw < SLAB_OFFSET {
226            return None;
227        }
228        let shard = ((id >> self.shard_shift) & self.shard_mask) as usize;
229        Some((shard, (raw - SLAB_OFFSET) as usize))
230    }
231
232    /// Read a slab slot's `actual_id` from the lock-free sidecar.
233    /// `Acquire` pairs with the `Release` store in `new_span` so the
234    /// value is visible once the encoded tracing id has been published
235    /// to the caller.
236    #[inline]
237    pub(crate) fn load_actual_id(&self, shard: usize, slab_idx: usize) -> u64 {
238        self.in_flight[shard].actual_ids[slab_idx].load(Ordering::Acquire)
239    }
240
241    /// Returns a closed span by its actual_id (`SpanRecord.id`).  This is
242    /// the id stored in `parent_id` and used as the BTreeMap key.
243    /// Only closed spans are reachable here.
244    pub fn get_span(&self, actual_id: u64) -> Option<SpanRecord> {
245        #[allow(clippy::expect_used, reason = "poisoned lock")]
246        let map = self.map.read().expect("lock must not be poisoned");
247        map.get(&actual_id).cloned()
248    }
249
250    /// Drop every closed span currently in the BTreeMap.  Called by
251    /// the host when the cache-recording level transitions to `OFF`
252    /// so a paused host doesn't keep stale data around to confuse the
253    /// next session.  In-flight spans (still open in the slabs) are
254    /// not affected; if any close after this call they'll repopulate
255    /// the map as normal.
256    pub fn clear(&self) {
257        #[allow(clippy::expect_used, reason = "poisoned lock")]
258        let mut map = self.map.write().expect("lock must not be poisoned");
259        map.clear();
260    }
261
262    /// Resolve the `actual_id` (i.e. the [`SpanRecord::id`] used as the
263    /// `BTreeMap` key after close) for an in-flight span addressed by
264    /// its `tracing::span::Id` u64.  Lock-free `Acquire` load from the
265    /// per-shard sidecar — does not touch the slab `Mutex`.
266    pub fn actual_id_for(&self, tracing_id: u64) -> Option<u64> {
267        let (shard, slab_idx) = self.decode_tracing_id(tracing_id)?;
268        Some(self.load_actual_id(shard, slab_idx))
269    }
270
271    /// Returns closed spans in ascending actual_id order.  Open spans are
272    /// not included; call [`Self::flush_pending`] + [`Driver::drain_sync`]
273    /// first if you need just-closed spans to appear.
274    pub fn page(&self, after_id: u64, limit: usize) -> Vec<SpanRecord> {
275        #[allow(clippy::expect_used, reason = "poisoned lock")]
276        let map = self.map.read().expect("lock must not be poisoned");
277        if after_id == 0 {
278            map.values().take(limit).cloned().collect()
279        } else {
280            map.range((Excluded(after_id), Unbounded))
281                .take(limit)
282                .map(|(_, v)| v.clone())
283                .collect()
284        }
285    }
286
287    /// Drains the calling thread's two PENDING buffers (spans + events)
288    /// into their respective spillway channels.  Must be called before
289    /// [`Driver::drain_sync`] in tests to ensure all recently-closed
290    /// spans and emitted events are delivered.
291    pub fn flush_pending(&self) {
292        THREAD_SENDERS.with(|sc| {
293            // SAFETY: cell is thread-local and we only hold the &mut for
294            // the duration of this closure; nothing inside re-enters
295            // THREAD_SENDERS.
296            let slot = unsafe { &mut *sc.get() };
297            let cache_addr = self as *const _ as usize;
298            let needs_init = !matches!(slot, Some(t) if t.cache_addr == cache_addr);
299            if needs_init {
300                *slot = Some(ThreadSenders {
301                    cache_addr,
302                    span: self.span_sender.clone(),
303                    event: self.event_sender.clone(),
304                });
305            }
306            // SAFETY: `slot` was just guaranteed to be `Some`.
307            let senders = unsafe { slot.as_ref().unwrap_unchecked() };
308            // Avoid send_many on empty drains — spillway's chute
309            // invariant rejects sender clones that publish without
310            // having ever held content.  On `Error::Full`, the rejected
311            // drain is bound to the match arm and dropped, which drops
312            // each unsent record (and runs `ReuseRef::Drop` for events,
313            // returning the EventRecord allocation to the pool).
314            pending_drain_events(|events| {
315                if events.len() > 0
316                    && let Err(spillway::Error::Full(_dropped)) = senders.event.send_many(events)
317                {
318                    log::debug!("event channel full; dropping a batch — driver is behind");
319                }
320            });
321            pending_drain_spans(|spans| {
322                if spans.len() > 0
323                    && let Err(spillway::Error::Full(_dropped)) = senders.span.send_many(spans)
324                {
325                    log::debug!("span channel full; dropping a batch — driver is behind");
326                }
327            });
328        });
329    }
330}
331
332// ── Subscriber impl ──────────────────────────────────────────────────────────
333
334impl<P: EnabledPredicate> tracing::Subscriber for SpanCache<P> {
335    fn max_level_hint(&self) -> Option<LevelFilter> {
336        self.predicate.max_level_hint()
337    }
338
339    fn register_callsite(
340        &self,
341        metadata: &'static Metadata<'static>,
342    ) -> tracing::subscriber::Interest {
343        match self.predicate.callsite_enabled(metadata) {
344            Interest::Never => tracing::subscriber::Interest::never(),
345            Interest::Sometimes => tracing::subscriber::Interest::sometimes(),
346            Interest::Always => tracing::subscriber::Interest::always(),
347        }
348    }
349
350    fn enabled(&self, metadata: &Metadata<'_>) -> bool {
351        if matches!(stack_top(), Some(s) if s.tracing_id == DISABLED) {
352            return false;
353        }
354        self.predicate.enabled(metadata)
355    }
356
357    fn event_enabled(&self, event: &tracing::Event<'_>) -> bool {
358        self.predicate.enabled(event.metadata())
359    }
360
361    fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
362        // Step A: resolve parent's actual_id from a side-channel — no
363        // slab lock.  Contextual: the parent's `actual_id` is right there
364        // on the SPAN_STACK entry.  Explicit: the parent's tracing id
365        // encodes its slab address, and the sidecar holds its actual_id.
366        let parent_actual_id: Option<u64> = if attrs.is_contextual() {
367            match stack_top() {
368                None => return disabled_id(),
369                Some(top) if top.tracing_id == DISABLED => return disabled_id(),
370                Some(top) => Some(top.actual_id),
371            }
372        } else if attrs.is_root() {
373            if stack_top().is_some() {
374                log::warn!("root span created with an active span on the stack; disabling");
375                return disabled_id();
376            }
377            None
378        } else {
379            // `attrs.parent()` is `Some` in this arm: the `else`
380            // branch is reached after `!is_contextual() && !is_root()`,
381            // which leaves only the "explicit parent" case in
382            // tracing's span-attributes model.
383            let Some(parent) = attrs.parent() else {
384                return disabled_id();
385            };
386            let explicit = id_to_u64(parent);
387            match self.decode_tracing_id(explicit) {
388                // Lock-free read from the sidecar: the parent's actual_id
389                // was published by a `Release` store before the parent's
390                // tracing id was returned to the caller, so this `Acquire`
391                // load sees it.
392                Some((p_shard, p_slab)) => Some(self.load_actual_id(p_shard, p_slab)),
393                None => return disabled_id(),
394            }
395        };
396
397        // Step B: predicate check.
398        if !self.predicate.new_span_enabled(attrs) {
399            return disabled_id();
400        }
401
402        // Step C: build record outside the lock so field-visitor work
403        // doesn't extend the critical section.
404        let actual_id = self.allocate_actual_id();
405        let mut record = SpanRecord {
406            id: actual_id,
407            parent_id: parent_actual_id,
408            metadata: attrs.metadata(),
409            fields: FieldList::new(),
410            events: Vec::new(),
411            opened_at: Instant::now(),
412            closed_at: None,
413        };
414        attrs.record(&mut FieldVisitor {
415            fields: &mut record.fields,
416        });
417
418        // Step D: pick our shard, capacity-check + slab.insert under the
419        // Mutex, then drop the guard before the sidecar store and the
420        // pure-arithmetic id encoding.  The sidecar is a separate atomic
421        // and the Release/Acquire pair on `actual_ids[slab_idx]` is its
422        // own happens-before — nobody can observe this slot until we
423        // return the tracing id below, so the store doesn't need to be
424        // sequenced under the slab Mutex.
425        let shard = self.pick_shard();
426        let lane = &self.in_flight[shard];
427        let slab_idx = {
428            #[allow(clippy::expect_used, reason = "poisoned lock")]
429            let mut slab = lane.slab.lock().expect("lock must not be poisoned");
430            if slab.len() >= self.shard_capacity {
431                log::warn!(
432                    "span shard {shard} full; new span disabled. \
433                     Increase capacity or reduce span rate."
434                );
435                return disabled_id();
436            }
437            slab.insert(record)
438        };
439        // The sidecar is sized to `shard_capacity` and indexed by
440        // slab_idx, which is bounded by capacity per the check above.
441        lane.actual_ids[slab_idx].store(actual_id, Ordering::Release);
442        u64_to_id(self.encode_tracing_id(shard, slab_idx))
443    }
444
445    fn record(&self, span: &tracing::span::Id, values: &tracing::span::Record<'_>) {
446        let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(span)) {
447            Some(t) => t,
448            None => return,
449        };
450        #[allow(clippy::expect_used, reason = "poisoned lock")]
451        let mut shard_lock = self.in_flight[shard]
452            .slab
453            .lock()
454            .expect("lock must not be poisoned");
455        if let Some(rec) = shard_lock.get_mut(slab_idx) {
456            values.record(&mut FieldVisitor {
457                fields: &mut rec.fields,
458            });
459        }
460    }
461
462    fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
463
464    fn event(&self, event: &tracing::Event<'_>) {
465        // Resolve the parent's `actual_id` lock-free.  Contextual events
466        // get it straight off the SPAN_STACK entry; events with an
467        // explicit parent decode the tracing id and `Acquire`-load from
468        // the per-shard sidecar — no slab lock either way.
469        let parent_actual_id = match event.parent().map(id_to_u64) {
470            Some(tracing_id) => {
471                if tracing_id == DISABLED {
472                    log::debug!("event dropped: parent span is disabled");
473                    return;
474                }
475                match self.decode_tracing_id(tracing_id) {
476                    Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
477                    None => return,
478                }
479            }
480            None => match stack_top() {
481                Some(top) if top.tracing_id == DISABLED => {
482                    log::debug!("event dropped: parent span is disabled");
483                    return;
484                }
485                Some(top) => top.actual_id,
486                None => {
487                    log::debug!("event dropped: no active span");
488                    return;
489                }
490            },
491        };
492
493        // Acquire a pooled EventRecord, fill it in place.  The pooled
494        // FieldList allocation is preserved across reuse.
495        let mut record = self.event_pool.acquire();
496        record.metadata = Some(event.metadata());
497        record.recorded_at = Some(Instant::now());
498        record.fields.clear();
499        event.record(&mut FieldVisitor {
500            fields: &mut record.fields,
501        });
502
503        // Hand off to the driver via the event PENDING — no slab lock
504        // here.  The driver attaches to the parent's `events` vec
505        // (directly if the parent's already in the map, or via the
506        // side buffer if the event raced ahead of the span).
507        if pending_push_event(EventMessage {
508            parent_actual_id,
509            record,
510        }) >= self.pending_batch
511        {
512            self.flush_pending();
513        }
514    }
515
516    fn enter(&self, span: &tracing::span::Id) {
517        // Resolve actual_id once, lock-free, and stash it on the stack so
518        // a contextual `new_span` underneath can read its parent's
519        // actual_id without locking the parent's slab.
520        let tracing_id = id_to_u64(span);
521        let actual_id = match self.decode_tracing_id(tracing_id) {
522            Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
523            None => 0, // DISABLED entry — actual_id is never read.
524        };
525        stack_push(StackedSpan {
526            tracing_id,
527            actual_id,
528        });
529    }
530
531    fn exit(&self, _span: &tracing::span::Id) {
532        stack_pop();
533    }
534
535    fn try_close(&self, id: tracing::span::Id) -> bool {
536        let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(&id)) {
537            Some(t) => t,
538            None => return false,
539        };
540
541        // Single slab lookup via `try_remove` (no contains-then-remove
542        // double hash), and `Instant::now()` lives outside the critical
543        // section — only paid on the success path.
544        #[allow(clippy::expect_used, reason = "poisoned lock")]
545        let record = self.in_flight[shard]
546            .slab
547            .lock()
548            .expect("lock must not be poisoned")
549            .try_remove(slab_idx);
550
551        if let Some(mut record) = record {
552            record.closed_at = Some(Instant::now());
553            if pending_push_span(record) >= self.pending_batch {
554                self.flush_pending();
555            }
556            true
557        } else {
558            false
559        }
560    }
561}