1use std::collections::BTreeMap;
15use std::sync::{Arc, RwLock};
16
17use crate::object_pool::ReuseRef;
18use crate::record::{EventRecord, SpanRecord};
19
20pub struct EventMessage {
22 pub parent_actual_id: u64,
23 pub record: ReuseRef<EventRecord>,
24}
25
26pub struct Driver {
27 pub(crate) map: Arc<RwLock<BTreeMap<u64, SpanRecord>>>,
28 pub(crate) span_receiver: spillway::Receiver<SpanRecord>,
29 pub(crate) event_receiver: spillway::Receiver<EventMessage>,
30 pub(crate) capacity: usize,
31 pub(crate) side_events: BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
39}
40
41impl Driver {
42 pub async fn run(self) {
45 let Driver {
46 map,
47 mut span_receiver,
48 mut event_receiver,
49 capacity,
50 mut side_events,
51 } = self;
52
53 let mut span_closed = false;
54 let mut event_closed = false;
55 loop {
56 tokio::select! {
57 span_batch = span_receiver.next_batch(), if !span_closed => {
58 match span_batch {
59 Some(batch) => Self::flush_span_batch(
60 &map, &mut side_events, capacity, batch,
61 ),
62 None => span_closed = true,
63 }
64 }
65 event_batch = event_receiver.next_batch(), if !event_closed => {
66 match event_batch {
67 Some(batch) => Self::flush_event_batch(
68 &map, &mut side_events, capacity, batch,
69 ),
70 None => event_closed = true,
71 }
72 }
73 else => break,
74 }
75 if span_closed && event_closed {
76 break;
77 }
78 }
79 }
80
81 pub fn drain_sync(self) {
86 let Driver {
87 map,
88 mut span_receiver,
89 mut event_receiver,
90 capacity,
91 mut side_events,
92 ..
93 } = self;
94
95 let mut events = Vec::new();
96 while let Some(e) = event_receiver.try_next() {
97 events.push(e);
98 }
99 Self::flush_event_batch(&map, &mut side_events, capacity, events.into_iter());
100
101 let mut spans = Vec::new();
102 while let Some(s) = span_receiver.try_next() {
103 spans.push(s);
104 }
105 Self::flush_span_batch(&map, &mut side_events, capacity, spans.into_iter());
106 }
107
108 pub(crate) fn flush_span_batch(
109 map: &RwLock<BTreeMap<u64, SpanRecord>>,
110 side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
111 capacity: usize,
112 batch: impl ExactSizeIterator<Item = SpanRecord>,
113 ) {
114 if batch.len() == 0 {
115 return;
116 }
117 #[allow(clippy::expect_used, reason = "poisoned lock")]
118 let mut m = map.write().expect("lock must not be poisoned");
119 let any_side = !side_events.is_empty();
120 for mut span in batch {
121 if any_side && let Some(events) = side_events.remove(&span.id) {
124 span.events.extend(events);
125 }
126 while m.len() >= capacity {
127 if m.pop_first().is_none() {
128 break;
129 }
130 }
131 m.insert(span.id, span);
132 }
133 }
134
135 pub(crate) fn flush_event_batch(
136 map: &RwLock<BTreeMap<u64, SpanRecord>>,
137 side_events: &mut BTreeMap<u64, Vec<ReuseRef<EventRecord>>>,
138 capacity: usize,
139 batch: impl ExactSizeIterator<Item = EventMessage>,
140 ) {
141 if batch.len() == 0 {
142 return;
143 }
144 #[allow(clippy::expect_used, reason = "poisoned lock")]
145 let mut m = map.write().expect("lock must not be poisoned");
146 for EventMessage {
147 parent_actual_id,
148 record,
149 } in batch
150 {
151 if let Some(span) = m.get_mut(&parent_actual_id) {
152 span.events.push(record);
153 continue;
154 }
155 if let Some(events) = side_events.get_mut(&parent_actual_id) {
156 events.push(record);
157 continue;
158 }
159 if side_events.len() >= capacity {
164 side_events.pop_first();
165 }
166 side_events.insert(parent_actual_id, vec![record]);
167 }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use std::sync::Arc;
174 use std::time::Instant;
175
176 use tracing::callsite::{Callsite, DefaultCallsite, Identifier};
177 use tracing::field::FieldSet;
178 use tracing::metadata::Kind;
179 use tracing::{Level, Metadata};
180
181 use super::*;
182 use crate::object_pool::ObjectPool;
183 use crate::record::FieldList;
184
185 static CALLSITE: DefaultCallsite = {
189 static META: Metadata<'static> = Metadata::new(
190 "driver_test_span",
191 "driver::test",
192 Level::INFO,
193 None,
194 None,
195 None,
196 FieldSet::new(&[], Identifier(&CALLSITE)),
197 Kind::SPAN,
198 );
199 DefaultCallsite::new(&META)
200 };
201
202 fn test_metadata() -> &'static Metadata<'static> {
203 CALLSITE.metadata()
204 }
205
206 fn make_event(pool: &ObjectPool<EventRecord>, parent_id: u64) -> EventMessage {
207 let mut record = pool.acquire();
208 record.metadata = Some(test_metadata());
209 record.recorded_at = Some(Instant::now());
210 record.fields = FieldList::default();
211 EventMessage {
212 parent_actual_id: parent_id,
213 record,
214 }
215 }
216
217 fn make_span(id: u64) -> SpanRecord {
218 SpanRecord {
219 id,
220 parent_id: None,
221 metadata: test_metadata(),
222 fields: FieldList::default(),
223 events: Vec::new(),
224 opened_at: Instant::now(),
225 closed_at: Some(Instant::now()),
226 }
227 }
228
229 fn empty_map() -> Arc<RwLock<BTreeMap<u64, SpanRecord>>> {
230 Arc::new(RwLock::new(BTreeMap::new()))
231 }
232
233 type Side = BTreeMap<u64, Vec<ReuseRef<EventRecord>>>;
234
235 fn bucket_len(side: &Side, parent_id: u64) -> Option<usize> {
236 side.get(&parent_id).map(Vec::len)
237 }
238
239 #[test]
240 fn event_orphan_below_capacity_stashes_for_parent() {
241 let pool = ObjectPool::<EventRecord>::new(1, 16);
244 let map = empty_map();
245 let mut side: Side = BTreeMap::new();
246
247 let events = vec![make_event(&pool, 99), make_event(&pool, 99)];
248 Driver::flush_event_batch(&map, &mut side, 8, events.into_iter());
249 assert_eq!(bucket_len(&side, 99), Some(2));
250 assert!(
251 map.read().unwrap().is_empty(),
252 "events must not insert spans"
253 );
254
255 Driver::flush_span_batch(&map, &mut side, 8, std::iter::once(make_span(99)));
257 assert!(
258 side.is_empty(),
259 "side bucket for 99 must drain on span arrival"
260 );
261 let m = map.read().unwrap();
262 let span = m.get(&99).expect("span 99 inserted");
263 assert_eq!(span.events.len(), 2);
264 }
265
266 #[test]
267 fn event_orphan_at_capacity_evicts_oldest_parent_id() {
268 const CAP: usize = 4;
273 let pool = ObjectPool::<EventRecord>::new(1, 16);
274 let map = empty_map();
275 let mut side: Side = BTreeMap::new();
276
277 let mut fill: Vec<EventMessage> = Vec::new();
278 for parent in [10u64, 20, 30, 40] {
279 fill.push(make_event(&pool, parent));
280 }
281 Driver::flush_event_batch(&map, &mut side, CAP, fill.into_iter());
282 assert_eq!(side.len(), CAP);
283 let ids: Vec<u64> = side.keys().copied().collect();
284 assert_eq!(ids, vec![10, 20, 30, 40]);
285
286 Driver::flush_event_batch(
287 &map,
288 &mut side,
289 CAP,
290 std::iter::once(make_event(&pool, 999)),
291 );
292 let ids: Vec<u64> = side.keys().copied().collect();
293 assert_eq!(ids, vec![20, 30, 40, 999], "smallest id must be evicted");
294 assert_eq!(bucket_len(&side, 999), Some(1));
295 assert!(bucket_len(&side, 10).is_none());
296 }
297
298 #[test]
299 fn event_orphan_at_capacity_grows_existing_parent_without_eviction() {
300 const CAP: usize = 2;
303 let pool = ObjectPool::<EventRecord>::new(1, 16);
304 let map = empty_map();
305 let mut side: Side = BTreeMap::new();
306
307 Driver::flush_event_batch(
308 &map,
309 &mut side,
310 CAP,
311 vec![make_event(&pool, 1), make_event(&pool, 2)].into_iter(),
312 );
313 assert_eq!(side.len(), CAP);
314 assert_eq!(bucket_len(&side, 1), Some(1));
315
316 Driver::flush_event_batch(
320 &map,
321 &mut side,
322 CAP,
323 vec![make_event(&pool, 1), make_event(&pool, 1)].into_iter(),
324 );
325 assert_eq!(side.len(), CAP);
326 assert_eq!(bucket_len(&side, 1), Some(3));
327 assert_eq!(bucket_len(&side, 2), Some(1));
328 }
329
330 #[test]
331 fn event_orphan_appends_to_existing_parent_below_capacity() {
332 const CAP: usize = 8;
335 let pool = ObjectPool::<EventRecord>::new(1, 16);
336 let map = empty_map();
337 let mut side: Side = BTreeMap::new();
338
339 Driver::flush_event_batch(
340 &map,
341 &mut side,
342 CAP,
343 vec![
344 make_event(&pool, 7),
345 make_event(&pool, 7),
346 make_event(&pool, 7),
347 ]
348 .into_iter(),
349 );
350 assert_eq!(side.len(), 1);
351 assert_eq!(bucket_len(&side, 7), Some(3));
352 }
353
354 #[test]
355 fn event_orphan_eviction_drops_entire_bucket_not_just_one_event() {
356 const CAP: usize = 2;
360 let pool = ObjectPool::<EventRecord>::new(1, 16);
361 let map = empty_map();
362 let mut side: Side = BTreeMap::new();
363
364 Driver::flush_event_batch(
367 &map,
368 &mut side,
369 CAP,
370 vec![
371 make_event(&pool, 1),
372 make_event(&pool, 1),
373 make_event(&pool, 1),
374 make_event(&pool, 2),
375 ]
376 .into_iter(),
377 );
378 assert_eq!(bucket_len(&side, 1), Some(3));
379
380 Driver::flush_event_batch(&map, &mut side, CAP, std::iter::once(make_event(&pool, 7)));
383 let ids: Vec<u64> = side.keys().copied().collect();
384 assert_eq!(ids, vec![2, 7]);
385 assert!(bucket_len(&side, 1).is_none());
386 }
387}