Skip to main content

tracing_cache/
driver.rs

1//! Background task that drains closed spans and emitted events from two
2//! spillway channels, attaching events to their parent span and writing
3//! the result into the shared `BTreeMap`.
4//!
5//! Two channels (rather than one enum-typed channel) keep each pipeline
6//! type-pure: span-only workloads pay no enum-match cost on the driver
7//! side, and each spillway carries a homogeneous payload of the
8//! natural per-payload size.  Ordering across channels isn't preserved,
9//! but the side buffer below handles temporal misordering: if an event
10//! arrives before its parent has been inserted into the map, it parks
11//! in `side_events` keyed by `parent_actual_id`, and the span's
12//! arrival drains the buffer and attaches the events.
13
14use std::collections::BTreeMap;
15use std::sync::{Arc, RwLock};
16
17use crate::object_pool::ReuseRef;
18use crate::record::{EventRecord, SpanRecord};
19
20/// Event payload on the event spillway channel.
21pub struct EventMessage {
22    pub parent_actual_id: u64,
23    pub record: ReuseRef<EventRecord>,
24}
25
26pub struct Driver {
27    pub(crate) map: Arc<RwLock<BTreeMap<u64, SpanRecord>>>,
28    pub(crate) span_receiver: spillway::Receiver<SpanRecord>,
29    pub(crate) event_receiver: spillway::Receiver<EventMessage>,
30    pub(crate) capacity: usize,
31    /// Events whose parent `SpanRecord` hasn't been inserted yet,
32    /// keyed by `parent_actual_id`.  Bounded by `capacity` distinct
33    /// parent ids; once full, a new parent's first event evicts the
34    /// oldest entry via `BTreeMap::pop_first` — and since
35    /// `parent_actual_id`s are monotonically allocated, the smallest
36    /// key is the oldest span.  Evicted `ReuseRef`s drop back into
37    /// the event pool.
38    pub(crate) side_events: BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
39}
40
41impl Driver {
42    /// Runs the driver loop.  `tokio::select!` pulls whichever channel
43    /// has a batch ready next; terminates when both channels are closed.
44    pub async fn run(self) {
45        let Driver {
46            map,
47            mut span_receiver,
48            mut event_receiver,
49            capacity,
50            mut side_events,
51        } = self;
52
53        let mut span_closed = false;
54        let mut event_closed = false;
55        loop {
56            tokio::select! {
57                span_batch = span_receiver.next_batch(), if !span_closed => {
58                    match span_batch {
59                        Some(batch) => Self::flush_span_batch(
60                            &map, &mut side_events, capacity, batch,
61                        ),
62                        None => span_closed = true,
63                    }
64                }
65                event_batch = event_receiver.next_batch(), if !event_closed => {
66                    match event_batch {
67                        Some(batch) => Self::flush_event_batch(
68                            &map, &mut side_events, capacity, batch,
69                        ),
70                        None => event_closed = true,
71                    }
72                }
73                else => break,
74            }
75            if span_closed && event_closed {
76                break;
77            }
78        }
79    }
80
81    /// Synchronously drain everything currently available on both
82    /// channels.  Used by tests after `cache.flush_pending()`.  Events
83    /// are drained first so that any event whose parent is in this
84    /// drain's span batch lands in the side buffer in time.
85    pub fn drain_sync(self) {
86        let Driver {
87            map,
88            mut span_receiver,
89            mut event_receiver,
90            capacity,
91            mut side_events,
92            ..
93        } = self;
94
95        let mut events = Vec::new();
96        while let Some(e) = event_receiver.try_next() {
97            events.push(e);
98        }
99        Self::flush_event_batch(&map, &mut side_events, capacity, events.into_iter());
100
101        let mut spans = Vec::new();
102        while let Some(s) = span_receiver.try_next() {
103            spans.push(s);
104        }
105        Self::flush_span_batch(&map, &mut side_events, capacity, spans.into_iter());
106    }
107
108    pub(crate) fn flush_span_batch(
109        map: &RwLock<BTreeMap<u64, SpanRecord>>,
110        side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
111        capacity: usize,
112        batch: impl ExactSizeIterator<Item = SpanRecord>,
113    ) {
114        if batch.len() == 0 {
115            return;
116        }
117        #[allow(clippy::expect_used, reason = "poisoned lock")]
118        let mut m = map.write().expect("lock must not be poisoned");
119        let any_side = !side_events.is_empty();
120        for mut span in batch {
121            // Fast-path: skip the lookup when the side buffer has
122            // nothing in it at all (span-only workloads).
123            if any_side && let Some(events) = side_events.remove(&span.id) {
124                span.events.extend(events);
125            }
126            while m.len() >= capacity {
127                if m.pop_first().is_none() {
128                    break;
129                }
130            }
131            m.insert(span.id, span);
132        }
133    }
134
135    pub(crate) fn flush_event_batch(
136        map: &RwLock<BTreeMap<u64, SpanRecord>>,
137        side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
138        capacity: usize,
139        batch: impl ExactSizeIterator<Item = EventMessage>,
140    ) {
141        if batch.len() == 0 {
142            return;
143        }
144        #[allow(clippy::expect_used, reason = "poisoned lock")]
145        let mut m = map.write().expect("lock must not be poisoned");
146        for EventMessage {
147            parent_actual_id,
148            record,
149        } in batch
150        {
151            if let Some(span) = m.get_mut(&parent_actual_id) {
152                span.events.push(record);
153                continue;
154            }
155            if let Some(events) = side_events.get_mut(&parent_actual_id) {
156                events.push(record);
157                continue;
158            }
159            // New parent.  If we're at capacity, evict the oldest
160            // bucket — `BTreeMap::pop_first` returns the smallest
161            // `parent_actual_id`, which is the oldest span by virtue
162            // of monotonic id allocation.  Then park the new bucket.
163            if side_events.len() >= capacity {
164                side_events.pop_first();
165            }
166            side_events.insert(parent_actual_id, vec![record]);
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use std::sync::Arc;
174    use std::time::Instant;
175
176    use tracing::callsite::{Callsite, DefaultCallsite, Identifier};
177    use tracing::field::FieldSet;
178    use tracing::metadata::Kind;
179    use tracing::{Level, Metadata};
180
181    use super::*;
182    use crate::object_pool::ObjectPool;
183    use crate::record::FieldList;
184
185    // Static metadata so we can build `SpanRecord`s without spinning up
186    // the tracing subscriber.  Pattern lifted from tracing-core's own
187    // tests (`missed_register_callsite.rs`).
188    static CALLSITE: DefaultCallsite = {
189        static META: Metadata<'static> = Metadata::new(
190            "driver_test_span",
191            "driver::test",
192            Level::INFO,
193            None,
194            None,
195            None,
196            FieldSet::new(&[], Identifier(&CALLSITE)),
197            Kind::SPAN,
198        );
199        DefaultCallsite::new(&META)
200    };
201
202    fn test_metadata() -> &'static Metadata<'static> {
203        CALLSITE.metadata()
204    }
205
206    fn make_event(pool: &ObjectPool<EventRecord>, parent_id: u64) -> EventMessage {
207        let mut record = pool.acquire();
208        record.metadata = Some(test_metadata());
209        record.recorded_at = Some(Instant::now());
210        record.fields = FieldList::default();
211        EventMessage {
212            parent_actual_id: parent_id,
213            record,
214        }
215    }
216
217    fn make_span(id: u64) -> SpanRecord {
218        SpanRecord {
219            id,
220            parent_id: None,
221            metadata: test_metadata(),
222            fields: FieldList::default(),
223            events: Vec::new(),
224            opened_at: Instant::now(),
225            closed_at: Some(Instant::now()),
226        }
227    }
228
229    fn empty_map() -> Arc<RwLock<BTreeMap<u64, SpanRecord>>> {
230        Arc::new(RwLock::new(BTreeMap::new()))
231    }
232
233    type Side = BTreeMap<u64, Vec<ReuseRef<EventRecord>>>;
234
235    fn bucket_len(side: &Side, parent_id: u64) -> Option<usize> {
236        side.get(&parent_id).map(Vec::len)
237    }
238
239    #[test]
240    fn event_orphan_below_capacity_stashes_for_parent() {
241        // Events for an unknown parent should park in `side_events`
242        // and survive there until the matching span arrives.
243        let pool = ObjectPool::<EventRecord>::new(1, 16);
244        let map = empty_map();
245        let mut side: Side = BTreeMap::new();
246
247        let events = vec![make_event(&pool, 99), make_event(&pool, 99)];
248        Driver::flush_event_batch(&map, &mut side, 8, events.into_iter());
249        assert_eq!(bucket_len(&side, 99), Some(2));
250        assert!(
251            map.read().unwrap().is_empty(),
252            "events must not insert spans"
253        );
254
255        // Parent arrives → orphans attach and the side bucket is drained.
256        Driver::flush_span_batch(&map, &mut side, 8, std::iter::once(make_span(99)));
257        assert!(
258            side.is_empty(),
259            "side bucket for 99 must drain on span arrival"
260        );
261        let m = map.read().unwrap();
262        let span = m.get(&99).expect("span 99 inserted");
263        assert_eq!(span.events.len(), 2);
264    }
265
266    #[test]
267    fn event_orphan_at_capacity_evicts_oldest_parent_id() {
268        // Fill the buffer with CAP distinct parents (ids 10, 20, 30, 40).
269        // A new parent (999) arriving at capacity should bump the
270        // smallest id (10) — which is the oldest span by virtue of
271        // monotonic actual_id allocation — and keep the rest.
272        const CAP: usize = 4;
273        let pool = ObjectPool::<EventRecord>::new(1, 16);
274        let map = empty_map();
275        let mut side: Side = BTreeMap::new();
276
277        let mut fill: Vec<EventMessage> = Vec::new();
278        for parent in [10u64, 20, 30, 40] {
279            fill.push(make_event(&pool, parent));
280        }
281        Driver::flush_event_batch(&map, &mut side, CAP, fill.into_iter());
282        assert_eq!(side.len(), CAP);
283        let ids: Vec<u64> = side.keys().copied().collect();
284        assert_eq!(ids, vec![10, 20, 30, 40]);
285
286        Driver::flush_event_batch(
287            &map,
288            &mut side,
289            CAP,
290            std::iter::once(make_event(&pool, 999)),
291        );
292        let ids: Vec<u64> = side.keys().copied().collect();
293        assert_eq!(ids, vec![20, 30, 40, 999], "smallest id must be evicted");
294        assert_eq!(bucket_len(&side, 999), Some(1));
295        assert!(bucket_len(&side, 10).is_none());
296    }
297
298    #[test]
299    fn event_orphan_at_capacity_grows_existing_parent_without_eviction() {
300        // Events for a parent already in the buffer should append to
301        // its vec — no eviction, since no new parent slot is claimed.
302        const CAP: usize = 2;
303        let pool = ObjectPool::<EventRecord>::new(1, 16);
304        let map = empty_map();
305        let mut side: Side = BTreeMap::new();
306
307        Driver::flush_event_batch(
308            &map,
309            &mut side,
310            CAP,
311            vec![make_event(&pool, 1), make_event(&pool, 2)].into_iter(),
312        );
313        assert_eq!(side.len(), CAP);
314        assert_eq!(bucket_len(&side, 1), Some(1));
315
316        // Two more events for the *existing* parent 1.  Buffer length
317        // stays at CAP; parent 1's bucket grows to 3.  Parent 2 is
318        // untouched.
319        Driver::flush_event_batch(
320            &map,
321            &mut side,
322            CAP,
323            vec![make_event(&pool, 1), make_event(&pool, 1)].into_iter(),
324        );
325        assert_eq!(side.len(), CAP);
326        assert_eq!(bucket_len(&side, 1), Some(3));
327        assert_eq!(bucket_len(&side, 2), Some(1));
328    }
329
330    #[test]
331    fn event_orphan_appends_to_existing_parent_below_capacity() {
332        // Below the cap, repeated events for the same parent id
333        // accumulate in its vec without growing the buffer length.
334        const CAP: usize = 8;
335        let pool = ObjectPool::<EventRecord>::new(1, 16);
336        let map = empty_map();
337        let mut side: Side = BTreeMap::new();
338
339        Driver::flush_event_batch(
340            &map,
341            &mut side,
342            CAP,
343            vec![
344                make_event(&pool, 7),
345                make_event(&pool, 7),
346                make_event(&pool, 7),
347            ]
348            .into_iter(),
349        );
350        assert_eq!(side.len(), 1);
351        assert_eq!(bucket_len(&side, 7), Some(3));
352    }
353
354    #[test]
355    fn event_orphan_eviction_drops_entire_bucket_not_just_one_event() {
356        // When the oldest bucket holds multiple events, eviction
357        // discards the whole bucket — those events drop back into
358        // the pool together.
359        const CAP: usize = 2;
360        let pool = ObjectPool::<EventRecord>::new(1, 16);
361        let map = empty_map();
362        let mut side: Side = BTreeMap::new();
363
364        // Parent 1 accumulates 3 events; parent 2 has 1.  Both
365        // buckets exist; buffer is at CAP.
366        Driver::flush_event_batch(
367            &map,
368            &mut side,
369            CAP,
370            vec![
371                make_event(&pool, 1),
372                make_event(&pool, 1),
373                make_event(&pool, 1),
374                make_event(&pool, 2),
375            ]
376            .into_iter(),
377        );
378        assert_eq!(bucket_len(&side, 1), Some(3));
379
380        // New parent 7 evicts parent 1's *entire* bucket — all
381        // three events go, not just one.
382        Driver::flush_event_batch(&map, &mut side, CAP, std::iter::once(make_event(&pool, 7)));
383        let ids: Vec<u64> = side.keys().copied().collect();
384        assert_eq!(ids, vec![2, 7]);
385        assert!(bucket_len(&side, 1).is_none());
386    }
387}