1use std::collections::BTreeMap;
18use std::sync::{Arc, Mutex};
19
20use crate::object_pool::ReuseRef;
21use crate::record::{EventRecord, SpanRecord};
22
23pub struct EventMessage {
25 pub parent_actual_id: u64,
26 pub record: ReuseRef<EventRecord>,
27}
28
29pub struct Driver {
30 pub(crate) span_receiver: spillway::Receiver<SpanRecord>,
31 pub(crate) event_receiver: spillway::Receiver<EventMessage>,
32 pub(crate) capacity: usize,
39 pub(crate) side_events: BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
42 pub(crate) subscribers: Arc<Mutex<Vec<spillway::Sender<SpanRecord>>>>,
46}
47
48impl Driver {
49 pub async fn run(self) {
52 let Driver {
53 mut span_receiver,
54 mut event_receiver,
55 capacity,
56 mut side_events,
57 subscribers,
58 } = self;
59
60 let mut span_closed = false;
61 let mut event_closed = false;
62 loop {
63 tokio::select! {
64 biased;
65 event_batch = event_receiver.next_batch(), if !event_closed => {
66 match event_batch {
67 Some(batch) => Self::flush_event_batch(
68 &mut side_events, capacity, batch,
69 ),
70 None => event_closed = true,
71 }
72 }
73 span_batch = span_receiver.next_batch(), if !span_closed => {
74 match span_batch {
75 Some(batch) => Self::flush_span_batch(
76 &mut side_events, &subscribers, batch,
77 ),
78 None => span_closed = true,
79 }
80 }
81 else => break,
82 }
83 if span_closed && event_closed {
84 break;
85 }
86 }
87 }
88
89 pub fn drain_sync(self) {
94 let Driver {
95 mut span_receiver,
96 mut event_receiver,
97 capacity,
98 mut side_events,
99 subscribers,
100 } = self;
101
102 let mut events = Vec::new();
103 while let Some(e) = event_receiver.try_next() {
104 events.push(e);
105 }
106 Self::flush_event_batch(&mut side_events, capacity, events.into_iter());
107
108 let mut spans = Vec::new();
109 while let Some(s) = span_receiver.try_next() {
110 spans.push(s);
111 }
112 Self::flush_span_batch(&mut side_events, &subscribers, spans.into_iter());
113 }
114
115 pub(crate) fn flush_span_batch(
116 side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
117 subscribers: &Mutex<Vec<spillway::Sender<SpanRecord>>>,
118 batch: impl ExactSizeIterator<Item = SpanRecord>,
119 ) {
120 if batch.len() == 0 {
121 return;
122 }
123 let mut prepared: Vec<SpanRecord> = Vec::with_capacity(batch.len());
127 let any_side = !side_events.is_empty();
128 for mut span in batch {
129 if any_side && let Some(events) = side_events.remove(&span.id) {
130 span.events.extend(events);
131 }
132 prepared.push(span);
133 }
134
135 #[allow(clippy::expect_used, reason = "poisoned lock")]
136 let mut subs = subscribers.lock().expect("lock must not be poisoned");
137 fanout_under_lock(&mut subs, prepared);
138 }
139
140 pub(crate) fn flush_event_batch(
141 side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
142 capacity: usize,
143 batch: impl ExactSizeIterator<Item = EventMessage>,
144 ) {
145 if batch.len() == 0 {
146 return;
147 }
148 for EventMessage {
149 parent_actual_id,
150 record,
151 } in batch
152 {
153 if let Some(events) = side_events.get_mut(&parent_actual_id) {
154 events.push(record);
155 continue;
156 }
157 if side_events.len() >= capacity {
162 side_events.pop_first();
163 }
164 side_events.insert(parent_actual_id, vec![record]);
165 }
166 }
167}
168
169fn fanout_under_lock(subs: &mut Vec<spillway::Sender<SpanRecord>>, prepared: Vec<SpanRecord>) {
176 if prepared.is_empty() {
177 return;
178 }
179 subs.retain(|sender| match sender.send_many(prepared.iter().cloned()) {
180 Ok(()) => true,
181 Err(spillway::Error::Closed(_)) => false,
182 Err(spillway::Error::Full(_)) => {
183 log::debug!("subscriber channel full; dropping a batch of closed spans");
184 true
185 }
186 });
187}
188
189#[cfg(test)]
190mod tests {
191 use std::time::Instant;
192
193 use tracing::callsite::{Callsite, DefaultCallsite, Identifier};
194 use tracing::field::FieldSet;
195 use tracing::metadata::Kind;
196 use tracing::{Level, Metadata};
197
198 use super::*;
199 use crate::object_pool::ObjectPool;
200 use crate::record::FieldList;
201
202 static CALLSITE: DefaultCallsite = {
206 static META: Metadata<'static> = Metadata::new(
207 "driver_test_span",
208 "driver::test",
209 Level::INFO,
210 None,
211 None,
212 None,
213 FieldSet::new(&[], Identifier(&CALLSITE)),
214 Kind::SPAN,
215 );
216 DefaultCallsite::new(&META)
217 };
218
219 fn test_metadata() -> &'static Metadata<'static> {
220 CALLSITE.metadata()
221 }
222
223 fn make_event(pool: &ObjectPool<EventRecord>, parent_id: u64) -> EventMessage {
224 let mut record = pool.acquire();
225 record.metadata = Some(test_metadata());
226 record.recorded_at = Some(Instant::now());
227 record.fields = FieldList::default();
228 EventMessage {
229 parent_actual_id: parent_id,
230 record,
231 }
232 }
233
234 fn make_span(id: u64) -> SpanRecord {
235 SpanRecord {
236 id,
237 parent_id: None,
238 metadata: test_metadata(),
239 fields: FieldList::default(),
240 events: Vec::new(),
241 opened_at: Instant::now(),
242 closed_at: Some(Instant::now()),
243 }
244 }
245
246 fn no_subscribers() -> Mutex<Vec<spillway::Sender<SpanRecord>>> {
247 Mutex::new(Vec::new())
248 }
249
250 type Side = BTreeMap<u64, Vec<ReuseRef<EventRecord>>>;
251
252 fn bucket_len(side: &Side, parent_id: u64) -> Option<usize> {
253 side.get(&parent_id).map(Vec::len)
254 }
255
256 #[test]
257 fn event_orphan_below_capacity_stashes_for_parent() {
258 let pool = ObjectPool::<EventRecord>::new(1, 16);
261 let mut side: Side = BTreeMap::new();
262
263 let events = vec![make_event(&pool, 99), make_event(&pool, 99)];
264 Driver::flush_event_batch(&mut side, 8, events.into_iter());
265 assert_eq!(bucket_len(&side, 99), Some(2));
266
267 Driver::flush_span_batch(&mut side, &no_subscribers(), std::iter::once(make_span(99)));
270 assert!(
271 side.is_empty(),
272 "side bucket for 99 must drain on span arrival"
273 );
274 }
275
276 #[test]
277 fn span_arrival_attaches_parked_events_to_fanout() {
278 let pool = ObjectPool::<EventRecord>::new(1, 16);
282 let mut side: Side = BTreeMap::new();
283 let subs = Mutex::new(Vec::new());
284
285 Driver::flush_event_batch(
287 &mut side,
288 8,
289 vec![make_event(&pool, 99), make_event(&pool, 99)].into_iter(),
290 );
291
292 let (sender, mut rx) = spillway::channel_with_capacity_and_concurrency(64, 1);
294 #[allow(clippy::expect_used, reason = "test")]
295 subs.lock().expect("test").push(sender);
296 Driver::flush_span_batch(&mut side, &subs, std::iter::once(make_span(99)));
297
298 let span = rx.try_next().expect("subscriber should receive span 99");
299 assert_eq!(span.id, 99);
300 assert_eq!(span.events.len(), 2);
301 }
302
303 #[test]
304 fn event_orphan_at_capacity_evicts_oldest_parent_id() {
305 const CAP: usize = 4;
310 let pool = ObjectPool::<EventRecord>::new(1, 16);
311 let mut side: Side = BTreeMap::new();
312
313 let mut fill: Vec<EventMessage> = Vec::new();
314 for parent in [10u64, 20, 30, 40] {
315 fill.push(make_event(&pool, parent));
316 }
317 Driver::flush_event_batch(&mut side, CAP, fill.into_iter());
318 assert_eq!(side.len(), CAP);
319 let ids: Vec<u64> = side.keys().copied().collect();
320 assert_eq!(ids, vec![10, 20, 30, 40]);
321
322 Driver::flush_event_batch(&mut side, CAP, std::iter::once(make_event(&pool, 999)));
323 let ids: Vec<u64> = side.keys().copied().collect();
324 assert_eq!(ids, vec![20, 30, 40, 999], "smallest id must be evicted");
325 assert_eq!(bucket_len(&side, 999), Some(1));
326 assert!(bucket_len(&side, 10).is_none());
327 }
328
329 #[test]
330 fn event_orphan_at_capacity_grows_existing_parent_without_eviction() {
331 const CAP: usize = 2;
334 let pool = ObjectPool::<EventRecord>::new(1, 16);
335 let mut side: Side = BTreeMap::new();
336
337 Driver::flush_event_batch(
338 &mut side,
339 CAP,
340 vec![make_event(&pool, 1), make_event(&pool, 2)].into_iter(),
341 );
342 assert_eq!(side.len(), CAP);
343 assert_eq!(bucket_len(&side, 1), Some(1));
344
345 Driver::flush_event_batch(
349 &mut side,
350 CAP,
351 vec![make_event(&pool, 1), make_event(&pool, 1)].into_iter(),
352 );
353 assert_eq!(side.len(), CAP);
354 assert_eq!(bucket_len(&side, 1), Some(3));
355 assert_eq!(bucket_len(&side, 2), Some(1));
356 }
357
358 #[test]
359 fn event_orphan_appends_to_existing_parent_below_capacity() {
360 const CAP: usize = 8;
363 let pool = ObjectPool::<EventRecord>::new(1, 16);
364 let mut side: Side = BTreeMap::new();
365
366 Driver::flush_event_batch(
367 &mut side,
368 CAP,
369 vec![
370 make_event(&pool, 7),
371 make_event(&pool, 7),
372 make_event(&pool, 7),
373 ]
374 .into_iter(),
375 );
376 assert_eq!(side.len(), 1);
377 assert_eq!(bucket_len(&side, 7), Some(3));
378 }
379
380 #[test]
381 fn event_orphan_eviction_drops_entire_bucket_not_just_one_event() {
382 const CAP: usize = 2;
386 let pool = ObjectPool::<EventRecord>::new(1, 16);
387 let mut side: Side = BTreeMap::new();
388
389 Driver::flush_event_batch(
392 &mut side,
393 CAP,
394 vec![
395 make_event(&pool, 1),
396 make_event(&pool, 1),
397 make_event(&pool, 1),
398 make_event(&pool, 2),
399 ]
400 .into_iter(),
401 );
402 assert_eq!(bucket_len(&side, 1), Some(3));
403
404 Driver::flush_event_batch(&mut side, CAP, std::iter::once(make_event(&pool, 7)));
407 let ids: Vec<u64> = side.keys().copied().collect();
408 assert_eq!(ids, vec![2, 7]);
409 assert!(bucket_len(&side, 1).is_none());
410 }
411}