Skip to main content

tracing_cache/
cache.rs

1//! The `SpanCache` subscriber and its in-flight slab shards.
2//!
3//! Open spans live in `Box<[ShardLane]>`, each lane a `Mutex<Slab>` plus
4//! a parallel sidecar of `actual_id`s readable without locking.  When a
5//! span closes, its `SpanRecord` is moved to the per-thread `PENDING`
6//! buffer (`tls::pending_push`) and eventually flushed to the spillway
7//! channel that `Driver` consumes.
8//!
9//! Closed spans are streamed to consumers via [`SpanCache::subscribe`]
10//! — the driver fans each freshly-committed `SpanRecord` out to every
11//! live subscriber and then drops it.  The cache holds no historical
12//! state, so consumers that need history must subscribe before the
13//! spans of interest are emitted.  Subscribers that have dropped their
14//! receiver are pruned lazily on the next send.
15
16use std::sync::atomic::{AtomicU64, Ordering, Ordering::Relaxed};
17use std::sync::{Arc, Mutex};
18use std::time::Instant;
19
20use slab::Slab;
21use tracing::metadata::LevelFilter;
22use tracing::{Level, Metadata};
23
24use crate::config::CacheConfig;
25use crate::driver::{Driver, EventMessage};
26use crate::id_encoding::{DISABLED, SLAB_OFFSET, disabled_id, id_to_u64, u64_to_id};
27use crate::object_pool::ObjectPool;
28use crate::predicate::{EnabledPredicate, Interest, LevelPredicate};
29use crate::record::{EventRecord, FieldList, FieldVisitor, SpanRecord};
30use crate::thread_state::{
31    ID_BATCH, ID_CURSOR, StackedSpan, THREAD_SENDERS, ThreadSenders, ensure_thread_shard_key,
32    pending_drain_events, pending_drain_spans, pending_push_event, pending_push_span, stack_pop,
33    stack_push, stack_top,
34};
35
36/// One slab shard plus a parallel sidecar of `actual_id`s that's readable
37/// without locking the slab.  The sidecar is sized to `shard_capacity`
38/// (the slab's growth bound) and indexed by `slab_idx`.  `new_span`
39/// writes the new span's `actual_id` with `Release` ordering before
40/// publishing the resulting tracing id; `enter` reads with `Acquire` to
41/// see that value.
42pub(crate) struct ShardLane {
43    pub(crate) slab: Mutex<Slab<SpanRecord>>,
44    pub(crate) actual_ids: Box<[AtomicU64]>,
45}
46
47/// A `tracing::Subscriber` that funnels closed spans to live consumers.
48///
49/// Open spans live in a sharded `Box<[Mutex<Slab<SpanRecord>>]>`, with
50/// the lane count set by [`CacheConfig::lane_count`] (default
51/// [`crate::DEFAULT_LANE_COUNT`]).  The shard is picked by a thread-local
52/// key; the slab gives an O(1) cache-friendly index, and the user-facing
53/// `tracing::span::Id` packs `(shard, slab_idx+2)` into a single u64 so
54/// `SPAN_STACK` push/pop and trait-method dispatch don't need a separate
55/// lookup.  When a span closes it moves to a per-thread buffer and is
56/// flushed to the [`Driver`] via a spillway channel.
57///
58/// The cache itself holds no closed spans — the driver fans each one
59/// out to [`SpanCache::subscribe`] receivers and then drops it.  Late
60/// events (an event whose parent span has already been fanned out)
61/// have nowhere to land and are dropped.
62///
63/// `SpanRecord.id` is an `actual_id` (separate from the tracing id),
64/// monotonic within a thread's `ID_BATCH`-sized reservation; across
65/// threads they interleave, so subscriber-observed order is
66/// driver-commit (close-time) order, not strict global open order.
67///
68/// Create with [`SpanCache::new`] / [`SpanCache::with_predicate`]
69/// (defaults) or [`SpanCache::with_config`] /
70/// [`SpanCache::with_predicate_and_config`] (custom batch sizes & lane
71/// count).  Each returns `(SpanCache, Driver)`; spawn the [`Driver`] as
72/// a background task to fan closed spans out to subscribers.
73pub struct SpanCache<P: EnabledPredicate = LevelPredicate> {
74    pub(crate) in_flight: Box<[ShardLane]>,
75    /// High-water mark for the `actual_id` space — the `SpanRecord.id`
76    /// stored on every record and referenced via `parent_id`, disjoint
77    /// from the encoded tracing id space.  Threads claim `ID_BATCH`-
78    /// sized slices via `fetch_add` and hand IDs out from a thread-
79    /// local reservation, so this counter is touched roughly once per
80    /// `ID_BATCH` spans rather than once per span.
81    pub(crate) id_high_water: AtomicU64,
82    pub(crate) predicate: P,
83    /// Per-shard capacity.  Total open-span budget is
84    /// `shard_capacity * lane_count`.
85    pub(crate) shard_capacity: usize,
86    pub(crate) span_sender: spillway::Sender<SpanRecord>,
87    pub(crate) event_sender: spillway::Sender<EventMessage>,
88    pub(crate) pending_batch: usize,
89    pub(crate) shard_mask: u64,  // lane_count - 1
90    pub(crate) shard_shift: u32, // 64 - log2(lane_count); shard at top of id
91    /// Shared pool of pre-allocated `EventRecord`s.  `event()` acquires
92    /// from the per-thread shard, fills the record, and pushes the
93    /// `ReuseRef` onto the parent span's events vec.  When the
94    /// `SpanRecord` finally drops (after fan-out), each `ReuseRef`
95    /// returns its allocation to the pool — amortising away the
96    /// per-event `Box::new` cost.
97    pub(crate) event_pool: Arc<ObjectPool<EventRecord>>,
98    /// Live consumers of closed spans.  [`SpanCache::subscribe`] pushes a
99    /// fresh `Sender` here and hands the matching `Receiver` to the
100    /// caller; the [`Driver`] holds an `Arc` clone of this `Mutex` and
101    /// fans out each committed `SpanRecord` to every entry.  Senders
102    /// that return `Error::Closed` (receiver dropped) are removed by
103    /// the driver on the next fan-out.
104    pub(crate) subscribers: Arc<Mutex<Vec<spillway::Sender<SpanRecord>>>>,
105}
106
107impl SpanCache<LevelPredicate> {
108    /// Default predicate (TRACE), default config.
109    pub fn new(capacity: usize) -> (Self, Driver) {
110        Self::with_predicate(capacity, LevelPredicate::new(Level::TRACE))
111    }
112
113    /// Default predicate (TRACE) with a custom [`CacheConfig`].
114    pub fn with_config(capacity: usize, config: CacheConfig) -> (Self, Driver) {
115        Self::with_predicate_and_config(capacity, LevelPredicate::new(Level::TRACE), config)
116    }
117}
118
119impl<P: EnabledPredicate> SpanCache<P> {
120    /// Custom predicate, default [`CacheConfig`].
121    pub fn with_predicate(capacity: usize, predicate: P) -> (Self, Driver) {
122        Self::with_predicate_and_config(capacity, predicate, CacheConfig::default())
123    }
124
125    /// Custom predicate and custom [`CacheConfig`].
126    pub fn with_predicate_and_config(
127        capacity: usize,
128        predicate: P,
129        config: CacheConfig,
130    ) -> (Self, Driver) {
131        // Silently clamp to [1, 256] and round up to the next power of two.
132        let lane_count = config.lane_count.clamp(1, 256).next_power_of_two();
133        let shard_bits = lane_count.trailing_zeros();
134        let shard_mask = (lane_count as u64) - 1;
135        // Reserve at least one bit at the top so `(shard as u64) << shift`
136        // is well-defined even when lane_count == 1.
137        let shard_shift = 64 - shard_bits.max(1);
138
139        // Bound both channels so a faster producer than consumer (e.g. a
140        // 16-core Graviton with 4 async workers vs. one driver task) can't
141        // grow spillway's internal buffers without bound and exhaust RAM.
142        // `send_many` rejects the whole batch with `Error::Full` when the
143        // limit is exceeded; `flush_pending` discards the rejected drain.
144        // Concurrency matches `lane_count` so each lane's threads tend to
145        // land on their own chute and contend less with peers (spillway's
146        // chute count caps useful per-clone parallelism).
147        let (span_sender, span_receiver) =
148            spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
149        let (event_sender, event_receiver) =
150            spillway::channel_with_capacity_and_concurrency(config.channel_capacity, lane_count);
151        let shard_capacity = capacity.div_ceil(lane_count);
152        let in_flight: Box<[ShardLane]> = (0..lane_count)
153            .map(|_| ShardLane {
154                slab: Mutex::new(Slab::with_capacity(shard_capacity)),
155                actual_ids: (0..shard_capacity)
156                    .map(|_| AtomicU64::new(0))
157                    .collect::<Vec<_>>()
158                    .into_boxed_slice(),
159            })
160            .collect::<Vec<_>>()
161            .into_boxed_slice();
162
163        // Event pool sharded the same way as the slab (one per lane);
164        // per-shard capacity is generous so steady-state event traffic
165        // never spills.  These are tiny structs (~360 B with the
166        // inline-8 FieldList), 256 × 16 lanes = ~1.5 MB worst case.
167        let event_pool = ObjectPool::<EventRecord>::new(lane_count, 256);
168
169        let subscribers = Arc::new(Mutex::new(Vec::new()));
170
171        let cache = SpanCache {
172            in_flight,
173            // Initialise to ID_BATCH so every `fetch_add(ID_BATCH)`
174            // returns a batch-aligned start; that's what makes the
175            // mask-based "cursor at boundary ⇒ refill" check work in
176            // `allocate_actual_id`.
177            id_high_water: AtomicU64::new(ID_BATCH),
178            predicate,
179            shard_capacity,
180            span_sender,
181            event_sender,
182            pending_batch: config.pending_batch,
183            shard_mask,
184            shard_shift,
185            event_pool,
186            subscribers: Arc::clone(&subscribers),
187        };
188        let driver = Driver {
189            span_receiver,
190            event_receiver,
191            capacity,
192            side_events: std::collections::BTreeMap::new(),
193            subscribers,
194        };
195        (cache, driver)
196    }
197
198    /// Number of in-flight slab shards this cache uses.
199    pub fn lane_count(&self) -> usize {
200        self.in_flight.len()
201    }
202
203    #[inline]
204    pub(crate) fn pick_shard(&self) -> usize {
205        (ensure_thread_shard_key() & self.shard_mask) as usize
206    }
207
208    /// Claim the next `actual_id` from this thread's reservation against
209    /// `id_high_water`, refilling via `fetch_add(ID_BATCH)` when the
210    /// reservation is exhausted (`cursor & (ID_BATCH - 1) == 0`).
211    /// Touches the shared atomic ~once per `ID_BATCH` spans rather than
212    /// once per span.
213    #[inline]
214    pub(crate) fn allocate_actual_id(&self) -> u64 {
215        ID_CURSOR.with(|cell| {
216            let cursor = cell.get();
217            if (cursor & (ID_BATCH - 1)) != 0 {
218                cell.set(cursor + 1);
219                cursor
220            } else {
221                let start = self.id_high_water.fetch_add(ID_BATCH, Relaxed);
222                cell.set(start + 1);
223                start
224            }
225        })
226    }
227
228    #[inline]
229    pub(crate) fn encode_tracing_id(&self, shard: usize, slab_idx: usize) -> u64 {
230        ((shard as u64) << self.shard_shift) | ((slab_idx as u64) + SLAB_OFFSET)
231    }
232
233    /// Decode a tracing id into `(shard, slab_idx)`.  Returns `None` for
234    /// `DISABLED` or anything outside the encoding scheme.
235    #[inline]
236    pub(crate) fn decode_tracing_id(&self, id: u64) -> Option<(usize, usize)> {
237        if id == DISABLED {
238            return None;
239        }
240        let slab_mask = (1u64 << self.shard_shift) - 1;
241        let raw = id & slab_mask;
242        if raw < SLAB_OFFSET {
243            return None;
244        }
245        let shard = ((id >> self.shard_shift) & self.shard_mask) as usize;
246        Some((shard, (raw - SLAB_OFFSET) as usize))
247    }
248
249    /// Read a slab slot's `actual_id` from the lock-free sidecar.
250    /// `Acquire` pairs with the `Release` store in `new_span` so the
251    /// value is visible once the encoded tracing id has been published
252    /// to the caller.
253    #[inline]
254    pub(crate) fn load_actual_id(&self, shard: usize, slab_idx: usize) -> u64 {
255        self.in_flight[shard].actual_ids[slab_idx].load(Ordering::Acquire)
256    }
257
258    /// Resolve the `actual_id` (i.e. the [`SpanRecord::id`] published
259    /// on the fan-out stream) for an in-flight span addressed by
260    /// its `tracing::span::Id` u64.  Lock-free `Acquire` load from the
261    /// per-shard sidecar — does not touch the slab `Mutex`.
262    pub fn actual_id_for(&self, tracing_id: u64) -> Option<u64> {
263        let (shard, slab_idx) = self.decode_tracing_id(tracing_id)?;
264        Some(self.load_actual_id(shard, slab_idx))
265    }
266
267    /// Register a new subscriber.  The returned `Receiver` yields
268    /// every closed span the cache produces from this call onward,
269    /// in the driver's commit (close-time) order.  The cache holds
270    /// no history — if you need to see spans from before the call,
271    /// subscribe earlier.
272    ///
273    /// Replaces the previous `page(after_id, _)` API, which keyed on
274    /// open-order `actual_id` and so silently dropped spans whenever
275    /// close order diverged from open order — the norm under async
276    /// workloads.
277    ///
278    /// `capacity` is the soft cap on in-flight spans for this
279    /// subscriber.  When the receiver falls behind by that much, the
280    /// driver logs and drops a whole batch — slow consumers don't back
281    /// up the pipeline.  Drop the receiver to unsubscribe; the driver
282    /// prunes the sender lazily on the next fan-out.
283    pub fn subscribe(&self, capacity: u64) -> spillway::Receiver<SpanRecord> {
284        // concurrency = 1: the driver is the only producer.
285        let (sender, receiver) = spillway::channel_with_capacity_and_concurrency(capacity, 1);
286        #[allow(clippy::expect_used, reason = "poisoned lock")]
287        self.subscribers
288            .lock()
289            .expect("lock must not be poisoned")
290            .push(sender);
291        receiver
292    }
293
294    /// Drains the calling thread's two PENDING buffers (spans + events)
295    /// into their respective spillway channels.  Must be called before
296    /// [`Driver::drain_sync`] in tests to ensure all recently-closed
297    /// spans and emitted events are delivered.
298    pub fn flush_pending(&self) {
299        THREAD_SENDERS.with(|sc| {
300            // SAFETY: cell is thread-local and we only hold the &mut for
301            // the duration of this closure; nothing inside re-enters
302            // THREAD_SENDERS.
303            let slot = unsafe { &mut *sc.get() };
304            let cache_addr = self as *const _ as usize;
305            let needs_init = !matches!(slot, Some(t) if t.cache_addr == cache_addr);
306            if needs_init {
307                *slot = Some(ThreadSenders {
308                    cache_addr,
309                    span: self.span_sender.clone(),
310                    event: self.event_sender.clone(),
311                });
312            }
313            // SAFETY: `slot` was just guaranteed to be `Some`.
314            let senders = unsafe { slot.as_ref().unwrap_unchecked() };
315            // Avoid send_many on empty drains — spillway's chute
316            // invariant rejects sender clones that publish without
317            // having ever held content.  On `Error::Full`, the rejected
318            // drain is bound to the match arm and dropped, which drops
319            // each unsent record (and runs `ReuseRef::Drop` for events,
320            // returning the EventRecord allocation to the pool).
321            pending_drain_events(|events| {
322                if events.len() > 0
323                    && let Err(spillway::Error::Full(_dropped)) = senders.event.send_many(events)
324                {
325                    log::debug!("event channel full; dropping a batch — driver is behind");
326                }
327            });
328            pending_drain_spans(|spans| {
329                if spans.len() > 0
330                    && let Err(spillway::Error::Full(_dropped)) = senders.span.send_many(spans)
331                {
332                    log::debug!("span channel full; dropping a batch — driver is behind");
333                }
334            });
335        });
336    }
337}
338
339// ── Subscriber impl ──────────────────────────────────────────────────────────
340
341impl<P: EnabledPredicate> tracing::Subscriber for SpanCache<P> {
342    fn max_level_hint(&self) -> Option<LevelFilter> {
343        self.predicate.max_level_hint()
344    }
345
346    fn register_callsite(
347        &self,
348        metadata: &'static Metadata<'static>,
349    ) -> tracing::subscriber::Interest {
350        match self.predicate.callsite_enabled(metadata) {
351            Interest::Never => tracing::subscriber::Interest::never(),
352            Interest::Sometimes => tracing::subscriber::Interest::sometimes(),
353            Interest::Always => tracing::subscriber::Interest::always(),
354        }
355    }
356
357    fn enabled(&self, metadata: &Metadata<'_>) -> bool {
358        if matches!(stack_top(), Some(s) if s.tracing_id == DISABLED) {
359            return false;
360        }
361        self.predicate.enabled(metadata)
362    }
363
364    fn event_enabled(&self, event: &tracing::Event<'_>) -> bool {
365        self.predicate.enabled(event.metadata())
366    }
367
368    fn new_span(&self, attrs: &tracing::span::Attributes<'_>) -> tracing::span::Id {
369        // Step A: resolve parent's actual_id from a side-channel — no
370        // slab lock.  Contextual: the parent's `actual_id` is right there
371        // on the SPAN_STACK entry.  Explicit: the parent's tracing id
372        // encodes its slab address, and the sidecar holds its actual_id.
373        let parent_actual_id: Option<u64> = if attrs.is_contextual() {
374            match stack_top() {
375                None => return disabled_id(),
376                Some(top) if top.tracing_id == DISABLED => return disabled_id(),
377                Some(top) => Some(top.actual_id),
378            }
379        } else if attrs.is_root() {
380            if stack_top().is_some() {
381                log::warn!("root span created with an active span on the stack; disabling");
382                return disabled_id();
383            }
384            None
385        } else {
386            // `attrs.parent()` is `Some` in this arm: the `else`
387            // branch is reached after `!is_contextual() && !is_root()`,
388            // which leaves only the "explicit parent" case in
389            // tracing's span-attributes model.
390            let Some(parent) = attrs.parent() else {
391                return disabled_id();
392            };
393            let explicit = id_to_u64(parent);
394            match self.decode_tracing_id(explicit) {
395                // Lock-free read from the sidecar: the parent's actual_id
396                // was published by a `Release` store before the parent's
397                // tracing id was returned to the caller, so this `Acquire`
398                // load sees it.
399                Some((p_shard, p_slab)) => Some(self.load_actual_id(p_shard, p_slab)),
400                None => return disabled_id(),
401            }
402        };
403
404        // Step B: predicate check.
405        if !self.predicate.new_span_enabled(attrs) {
406            return disabled_id();
407        }
408
409        // Step C: build record outside the lock so field-visitor work
410        // doesn't extend the critical section.
411        let actual_id = self.allocate_actual_id();
412        let mut record = SpanRecord {
413            id: actual_id,
414            parent_id: parent_actual_id,
415            metadata: attrs.metadata(),
416            fields: FieldList::new(),
417            events: Vec::new(),
418            opened_at: Instant::now(),
419            closed_at: None,
420        };
421        attrs.record(&mut FieldVisitor {
422            fields: &mut record.fields,
423        });
424
425        // Step D: pick our shard, capacity-check + slab.insert under the
426        // Mutex, then drop the guard before the sidecar store and the
427        // pure-arithmetic id encoding.  The sidecar is a separate atomic
428        // and the Release/Acquire pair on `actual_ids[slab_idx]` is its
429        // own happens-before — nobody can observe this slot until we
430        // return the tracing id below, so the store doesn't need to be
431        // sequenced under the slab Mutex.
432        let shard = self.pick_shard();
433        let lane = &self.in_flight[shard];
434        let slab_idx = {
435            #[allow(clippy::expect_used, reason = "poisoned lock")]
436            let mut slab = lane.slab.lock().expect("lock must not be poisoned");
437            if slab.len() >= self.shard_capacity {
438                log::warn!(
439                    "span shard {shard} full; new span disabled. \
440                     Increase capacity or reduce span rate."
441                );
442                return disabled_id();
443            }
444            slab.insert(record)
445        };
446        // The sidecar is sized to `shard_capacity` and indexed by
447        // slab_idx, which is bounded by capacity per the check above.
448        lane.actual_ids[slab_idx].store(actual_id, Ordering::Release);
449        u64_to_id(self.encode_tracing_id(shard, slab_idx))
450    }
451
452    fn record(&self, span: &tracing::span::Id, values: &tracing::span::Record<'_>) {
453        let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(span)) {
454            Some(t) => t,
455            None => return,
456        };
457        #[allow(clippy::expect_used, reason = "poisoned lock")]
458        let mut shard_lock = self.in_flight[shard]
459            .slab
460            .lock()
461            .expect("lock must not be poisoned");
462        if let Some(rec) = shard_lock.get_mut(slab_idx) {
463            values.record(&mut FieldVisitor {
464                fields: &mut rec.fields,
465            });
466        }
467    }
468
469    fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
470
471    fn event(&self, event: &tracing::Event<'_>) {
472        // Resolve the parent's `actual_id` lock-free.  Contextual events
473        // get it straight off the SPAN_STACK entry; events with an
474        // explicit parent decode the tracing id and `Acquire`-load from
475        // the per-shard sidecar — no slab lock either way.
476        let parent_actual_id = match event.parent().map(id_to_u64) {
477            Some(tracing_id) => {
478                if tracing_id == DISABLED {
479                    log::debug!("event dropped: parent span is disabled");
480                    return;
481                }
482                match self.decode_tracing_id(tracing_id) {
483                    Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
484                    None => return,
485                }
486            }
487            None => match stack_top() {
488                Some(top) if top.tracing_id == DISABLED => {
489                    log::debug!("event dropped: parent span is disabled");
490                    return;
491                }
492                Some(top) => top.actual_id,
493                None => {
494                    log::debug!("event dropped: no active span");
495                    return;
496                }
497            },
498        };
499
500        // Acquire a pooled EventRecord, fill it in place.  The pooled
501        // FieldList allocation is preserved across reuse.
502        let mut record = self.event_pool.acquire();
503        record.metadata = Some(event.metadata());
504        record.recorded_at = Some(Instant::now());
505        record.fields.clear();
506        event.record(&mut FieldVisitor {
507            fields: &mut record.fields,
508        });
509
510        // Hand off to the driver via the event PENDING — no slab lock
511        // here.  The driver attaches to the parent's `events` vec
512        // (directly if the parent's already in the map, or via the
513        // side buffer if the event raced ahead of the span).
514        if pending_push_event(EventMessage {
515            parent_actual_id,
516            record,
517        }) >= self.pending_batch
518        {
519            self.flush_pending();
520        }
521    }
522
523    fn enter(&self, span: &tracing::span::Id) {
524        // Resolve actual_id once, lock-free, and stash it on the stack so
525        // a contextual `new_span` underneath can read its parent's
526        // actual_id without locking the parent's slab.
527        let tracing_id = id_to_u64(span);
528        let actual_id = match self.decode_tracing_id(tracing_id) {
529            Some((shard, slab_idx)) => self.load_actual_id(shard, slab_idx),
530            None => 0, // DISABLED entry — actual_id is never read.
531        };
532        stack_push(StackedSpan {
533            tracing_id,
534            actual_id,
535        });
536    }
537
538    fn exit(&self, _span: &tracing::span::Id) {
539        stack_pop();
540    }
541
542    fn try_close(&self, id: tracing::span::Id) -> bool {
543        let (shard, slab_idx) = match self.decode_tracing_id(id_to_u64(&id)) {
544            Some(t) => t,
545            None => return false,
546        };
547
548        // Single slab lookup via `try_remove` (no contains-then-remove
549        // double hash), and `Instant::now()` lives outside the critical
550        // section — only paid on the success path.
551        #[allow(clippy::expect_used, reason = "poisoned lock")]
552        let record = self.in_flight[shard]
553            .slab
554            .lock()
555            .expect("lock must not be poisoned")
556            .try_remove(slab_idx);
557
558        if let Some(mut record) = record {
559            record.closed_at = Some(Instant::now());
560            if pending_push_span(record) >= self.pending_batch {
561                self.flush_pending();
562            }
563            true
564        } else {
565            false
566        }
567    }
568}