Skip to main content

tracing_cache/
driver.rs

1//! Background task that drains closed spans and emitted events from
2//! two spillway channels, attaches events to their parent span, and
3//! fans the resulting `SpanRecord`s out to every live subscriber.
4//!
5//! Two channels (rather than one enum-typed channel) keep each
6//! pipeline type-pure: span-only workloads pay no enum-match cost on
7//! the driver side, and each spillway carries a homogeneous payload
8//! of the natural per-payload size.  Ordering across channels isn't
9//! preserved, but the side buffer below handles temporal misordering:
10//! if an event arrives at the driver before its parent span, it
11//! parks in `side_events` keyed by `parent_actual_id`, and the span's
12//! arrival drains the buffer and attaches the events.  Events that
13//! arrive *after* the parent has been fanned out have nowhere to land
14//! and are dropped — within a single thread `flush_pending` always
15//! sends events before the closing span, so this is rare in practice.
16
17use std::collections::BTreeMap;
18use std::sync::{Arc, Mutex};
19
20use crate::object_pool::ReuseRef;
21use crate::record::{EventRecord, SpanRecord};
22
23/// Event payload on the event spillway channel.
24pub struct EventMessage {
25    pub parent_actual_id: u64,
26    pub record: ReuseRef<EventRecord>,
27}
28
29pub struct Driver {
30    pub(crate) span_receiver: spillway::Receiver<SpanRecord>,
31    pub(crate) event_receiver: spillway::Receiver<EventMessage>,
32    /// Cap on distinct parent ids the orphan-event buffer can hold
33    /// while waiting for their span to land.  Once full, a new
34    /// parent's first event evicts the oldest entry via
35    /// `BTreeMap::pop_first` — and since `parent_actual_id`s are
36    /// monotonically allocated, the smallest key is the oldest span.
37    /// Evicted `ReuseRef`s drop back into the event pool.
38    pub(crate) capacity: usize,
39    /// Events whose parent `SpanRecord` hasn't been fanned out yet,
40    /// keyed by `parent_actual_id`.  See `capacity` for the bound.
41    pub(crate) side_events: BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
42    /// Shared with [`crate::SpanCache::subscribers`].  Each closed
43    /// span the driver processes is cloned out to every entry; senders
44    /// that return `Error::Closed` are removed in place.
45    pub(crate) subscribers: Arc<Mutex<Vec<spillway::Sender<SpanRecord>>>>,
46}
47
48impl Driver {
49    /// Runs the driver loop.  `tokio::select!` pulls whichever channel
50    /// has a batch ready next; terminates when both channels are closed.
51    pub async fn run(self) {
52        let Driver {
53            mut span_receiver,
54            mut event_receiver,
55            capacity,
56            mut side_events,
57            subscribers,
58        } = self;
59
60        let mut span_closed = false;
61        let mut event_closed = false;
62        loop {
63            tokio::select! {
64                biased;
65                event_batch = event_receiver.next_batch(), if !event_closed => {
66                    match event_batch {
67                        Some(batch) => Self::flush_event_batch(
68                            &mut side_events, capacity, batch,
69                        ),
70                        None => event_closed = true,
71                    }
72                }
73                span_batch = span_receiver.next_batch(), if !span_closed => {
74                    match span_batch {
75                        Some(batch) => Self::flush_span_batch(
76                            &mut side_events, &subscribers, batch,
77                        ),
78                        None => span_closed = true,
79                    }
80                }
81                else => break,
82            }
83            if span_closed && event_closed {
84                break;
85            }
86        }
87    }
88
89    /// Synchronously drain everything currently available on both
90    /// channels.  Used by tests after `cache.flush_pending()`.  Events
91    /// are drained first so that any event whose parent is in this
92    /// drain's span batch lands in the side buffer in time.
93    pub fn drain_sync(self) {
94        let Driver {
95            mut span_receiver,
96            mut event_receiver,
97            capacity,
98            mut side_events,
99            subscribers,
100        } = self;
101
102        let mut events = Vec::new();
103        while let Some(e) = event_receiver.try_next() {
104            events.push(e);
105        }
106        Self::flush_event_batch(&mut side_events, capacity, events.into_iter());
107
108        let mut spans = Vec::new();
109        while let Some(s) = span_receiver.try_next() {
110            spans.push(s);
111        }
112        Self::flush_span_batch(&mut side_events, &subscribers, spans.into_iter());
113    }
114
115    pub(crate) fn flush_span_batch(
116        side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
117        subscribers: &Mutex<Vec<spillway::Sender<SpanRecord>>>,
118        batch: impl ExactSizeIterator<Item = SpanRecord>,
119    ) {
120        if batch.len() == 0 {
121            return;
122        }
123        // Attach parked orphan events (if any) to each span before
124        // fan-out.  Done outside the subscribers lock so the visible
125        // critical section is just the send loop.
126        let mut prepared: Vec<SpanRecord> = Vec::with_capacity(batch.len());
127        let any_side = !side_events.is_empty();
128        for mut span in batch {
129            if any_side && let Some(events) = side_events.remove(&span.id) {
130                span.events.extend(events);
131            }
132            prepared.push(span);
133        }
134
135        #[allow(clippy::expect_used, reason = "poisoned lock")]
136        let mut subs = subscribers.lock().expect("lock must not be poisoned");
137        fanout_under_lock(&mut subs, prepared);
138    }
139
140    pub(crate) fn flush_event_batch(
141        side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
142        capacity: usize,
143        batch: impl ExactSizeIterator<Item = EventMessage>,
144    ) {
145        if batch.len() == 0 {
146            return;
147        }
148        for EventMessage {
149            parent_actual_id,
150            record,
151        } in batch
152        {
153            if let Some(events) = side_events.get_mut(&parent_actual_id) {
154                events.push(record);
155                continue;
156            }
157            // New parent.  If we're at capacity, evict the oldest
158            // bucket — `BTreeMap::pop_first` returns the smallest
159            // `parent_actual_id`, which is the oldest span by virtue
160            // of monotonic id allocation.  Then park the new bucket.
161            if side_events.len() >= capacity {
162                side_events.pop_first();
163            }
164            side_events.insert(parent_actual_id, vec![record]);
165        }
166    }
167}
168
169/// Send each prepared span to every live subscriber.  Caller already
170/// holds the subscribers lock.  Slow consumers (`Error::Full`) drop a
171/// whole batch with a debug log; dropped receivers (`Error::Closed`)
172/// are removed in place.  The typical case is one subscriber per
173/// console, so the unconditional `clone()` is a non-issue — it's the
174/// `Vec<ReuseRef<EventRecord>>` clones that dominate.
175fn fanout_under_lock(subs: &mut Vec<spillway::Sender<SpanRecord>>, prepared: Vec<SpanRecord>) {
176    if prepared.is_empty() {
177        return;
178    }
179    subs.retain(|sender| match sender.send_many(prepared.iter().cloned()) {
180        Ok(()) => true,
181        Err(spillway::Error::Closed(_)) => false,
182        Err(spillway::Error::Full(_)) => {
183            log::debug!("subscriber channel full; dropping a batch of closed spans");
184            true
185        }
186    });
187}
188
189#[cfg(test)]
190mod tests {
191    use std::time::Instant;
192
193    use tracing::callsite::{Callsite, DefaultCallsite, Identifier};
194    use tracing::field::FieldSet;
195    use tracing::metadata::Kind;
196    use tracing::{Level, Metadata};
197
198    use super::*;
199    use crate::object_pool::ObjectPool;
200    use crate::record::FieldList;
201
202    // Static metadata so we can build `SpanRecord`s without spinning up
203    // the tracing subscriber.  Pattern lifted from tracing-core's own
204    // tests (`missed_register_callsite.rs`).
205    static CALLSITE: DefaultCallsite = {
206        static META: Metadata<'static> = Metadata::new(
207            "driver_test_span",
208            "driver::test",
209            Level::INFO,
210            None,
211            None,
212            None,
213            FieldSet::new(&[], Identifier(&CALLSITE)),
214            Kind::SPAN,
215        );
216        DefaultCallsite::new(&META)
217    };
218
219    fn test_metadata() -> &'static Metadata<'static> {
220        CALLSITE.metadata()
221    }
222
223    fn make_event(pool: &ObjectPool<EventRecord>, parent_id: u64) -> EventMessage {
224        let mut record = pool.acquire();
225        record.metadata = Some(test_metadata());
226        record.recorded_at = Some(Instant::now());
227        record.fields = FieldList::default();
228        EventMessage {
229            parent_actual_id: parent_id,
230            record,
231        }
232    }
233
234    fn make_span(id: u64) -> SpanRecord {
235        SpanRecord {
236            id,
237            parent_id: None,
238            metadata: test_metadata(),
239            fields: FieldList::default(),
240            events: Vec::new(),
241            opened_at: Instant::now(),
242            closed_at: Some(Instant::now()),
243        }
244    }
245
246    fn no_subscribers() -> Mutex<Vec<spillway::Sender<SpanRecord>>> {
247        Mutex::new(Vec::new())
248    }
249
250    type Side = BTreeMap<u64, Vec<ReuseRef<EventRecord>>>;
251
252    fn bucket_len(side: &Side, parent_id: u64) -> Option<usize> {
253        side.get(&parent_id).map(Vec::len)
254    }
255
256    #[test]
257    fn event_orphan_below_capacity_stashes_for_parent() {
258        // Events for an unknown parent should park in `side_events`
259        // and survive there until the matching span arrives.
260        let pool = ObjectPool::<EventRecord>::new(1, 16);
261        let mut side: Side = BTreeMap::new();
262
263        let events = vec![make_event(&pool, 99), make_event(&pool, 99)];
264        Driver::flush_event_batch(&mut side, 8, events.into_iter());
265        assert_eq!(bucket_len(&side, 99), Some(2));
266
267        // Parent arrives → orphans attach (and the span is fanned out
268        // to subscribers — none here, so we just check side drains).
269        Driver::flush_span_batch(&mut side, &no_subscribers(), std::iter::once(make_span(99)));
270        assert!(
271            side.is_empty(),
272            "side bucket for 99 must drain on span arrival"
273        );
274    }
275
276    #[test]
277    fn span_arrival_attaches_parked_events_to_fanout() {
278        // The span the subscriber receives carries the side-buffer
279        // events that were parked before it arrived — proving the
280        // events flow without going through a historical map.
281        let pool = ObjectPool::<EventRecord>::new(1, 16);
282        let mut side: Side = BTreeMap::new();
283        let subs = Mutex::new(Vec::new());
284
285        // Park two events for parent 99.
286        Driver::flush_event_batch(
287            &mut side,
288            8,
289            vec![make_event(&pool, 99), make_event(&pool, 99)].into_iter(),
290        );
291
292        // Subscriber connects, then parent 99 arrives.
293        let (sender, mut rx) = spillway::channel_with_capacity_and_concurrency(64, 1);
294        #[allow(clippy::expect_used, reason = "test")]
295        subs.lock().expect("test").push(sender);
296        Driver::flush_span_batch(&mut side, &subs, std::iter::once(make_span(99)));
297
298        let span = rx.try_next().expect("subscriber should receive span 99");
299        assert_eq!(span.id, 99);
300        assert_eq!(span.events.len(), 2);
301    }
302
303    #[test]
304    fn event_orphan_at_capacity_evicts_oldest_parent_id() {
305        // Fill the buffer with CAP distinct parents (ids 10, 20, 30, 40).
306        // A new parent (999) arriving at capacity should bump the
307        // smallest id (10) — which is the oldest span by virtue of
308        // monotonic actual_id allocation — and keep the rest.
309        const CAP: usize = 4;
310        let pool = ObjectPool::<EventRecord>::new(1, 16);
311        let mut side: Side = BTreeMap::new();
312
313        let mut fill: Vec<EventMessage> = Vec::new();
314        for parent in [10u64, 20, 30, 40] {
315            fill.push(make_event(&pool, parent));
316        }
317        Driver::flush_event_batch(&mut side, CAP, fill.into_iter());
318        assert_eq!(side.len(), CAP);
319        let ids: Vec<u64> = side.keys().copied().collect();
320        assert_eq!(ids, vec![10, 20, 30, 40]);
321
322        Driver::flush_event_batch(&mut side, CAP, std::iter::once(make_event(&pool, 999)));
323        let ids: Vec<u64> = side.keys().copied().collect();
324        assert_eq!(ids, vec![20, 30, 40, 999], "smallest id must be evicted");
325        assert_eq!(bucket_len(&side, 999), Some(1));
326        assert!(bucket_len(&side, 10).is_none());
327    }
328
329    #[test]
330    fn event_orphan_at_capacity_grows_existing_parent_without_eviction() {
331        // Events for a parent already in the buffer should append to
332        // its vec — no eviction, since no new parent slot is claimed.
333        const CAP: usize = 2;
334        let pool = ObjectPool::<EventRecord>::new(1, 16);
335        let mut side: Side = BTreeMap::new();
336
337        Driver::flush_event_batch(
338            &mut side,
339            CAP,
340            vec![make_event(&pool, 1), make_event(&pool, 2)].into_iter(),
341        );
342        assert_eq!(side.len(), CAP);
343        assert_eq!(bucket_len(&side, 1), Some(1));
344
345        // Two more events for the *existing* parent 1.  Buffer length
346        // stays at CAP; parent 1's bucket grows to 3.  Parent 2 is
347        // untouched.
348        Driver::flush_event_batch(
349            &mut side,
350            CAP,
351            vec![make_event(&pool, 1), make_event(&pool, 1)].into_iter(),
352        );
353        assert_eq!(side.len(), CAP);
354        assert_eq!(bucket_len(&side, 1), Some(3));
355        assert_eq!(bucket_len(&side, 2), Some(1));
356    }
357
358    #[test]
359    fn event_orphan_appends_to_existing_parent_below_capacity() {
360        // Below the cap, repeated events for the same parent id
361        // accumulate in its vec without growing the buffer length.
362        const CAP: usize = 8;
363        let pool = ObjectPool::<EventRecord>::new(1, 16);
364        let mut side: Side = BTreeMap::new();
365
366        Driver::flush_event_batch(
367            &mut side,
368            CAP,
369            vec![
370                make_event(&pool, 7),
371                make_event(&pool, 7),
372                make_event(&pool, 7),
373            ]
374            .into_iter(),
375        );
376        assert_eq!(side.len(), 1);
377        assert_eq!(bucket_len(&side, 7), Some(3));
378    }
379
380    #[test]
381    fn event_orphan_eviction_drops_entire_bucket_not_just_one_event() {
382        // When the oldest bucket holds multiple events, eviction
383        // discards the whole bucket — those events drop back into
384        // the pool together.
385        const CAP: usize = 2;
386        let pool = ObjectPool::<EventRecord>::new(1, 16);
387        let mut side: Side = BTreeMap::new();
388
389        // Parent 1 accumulates 3 events; parent 2 has 1.  Both
390        // buckets exist; buffer is at CAP.
391        Driver::flush_event_batch(
392            &mut side,
393            CAP,
394            vec![
395                make_event(&pool, 1),
396                make_event(&pool, 1),
397                make_event(&pool, 1),
398                make_event(&pool, 2),
399            ]
400            .into_iter(),
401        );
402        assert_eq!(bucket_len(&side, 1), Some(3));
403
404        // New parent 7 evicts parent 1's *entire* bucket — all
405        // three events go, not just one.
406        Driver::flush_event_batch(&mut side, CAP, std::iter::once(make_event(&pool, 7)));
407        let ids: Vec<u64> = side.keys().copied().collect();
408        assert_eq!(ids, vec![2, 7]);
409        assert!(bucket_len(&side, 1).is_none());
410    }
411}