devtools_core/
aggregator.rs

1use crate::{Command, Event, Shared, Watcher};
2use devtools_wire_format::logs::LogEvent;
3use devtools_wire_format::spans::SpanEvent;
4use devtools_wire_format::{instrument, logs, spans, NewMetadata};
5use futures::FutureExt;
6use ringbuf::consumer::Consumer;
7use ringbuf::traits::{Observer, RingBuffer};
8use ringbuf::HeapRb;
9use std::mem;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime};
13use tokio::sync::mpsc;
14
15/// The event aggregator
16///
17/// This is the heart of the instrumentation, it receives events from the
18/// [`Layer`], does light pre-processing, buffers them up into `Update`s and
19/// send these to all subscribed clients.
20pub struct Aggregator {
21    /// Data shared with the [`Layer`]
22    shared: Arc<Shared>,
23    /// Channel of events from the [`Layer`]
24    events: mpsc::Receiver<Event>,
25    /// Channel of commands from the gRPC server
26    cmds: mpsc::Receiver<Command>,
27
28    /// All metadata entries that were ever registered
29    /// This is fine to keep around bc there won't be many of them
30    all_metadata: Vec<NewMetadata>,
31    /// Metadata entries that were registered since the last update
32    /// This is emptied on every update
33    new_metadata: Vec<NewMetadata>,
34
35    /// Buffered log events.
36    /// Up to 512 events are retained before the oldest will be dropped.
37    logs: EventBuf<LogEvent, 512>,
38    /// Buffered span events.
39    /// Up to 512 events are retained before the oldest will be dropped.
40    spans: EventBuf<SpanEvent, 512>,
41
42    /// All connected clients
43    watchers: Vec<Watcher>,
44
45    /// Used to convert `Instant`s to `SystemTime`s and `Timestamp`s
46    pub(crate) base_time: TimeAnchor,
47}
48
49/// Whether to include all buffered events or only those that were buffered since the last update
50#[derive(Debug, Copy, Clone)]
51enum Include {
52    /// Include all buffered events
53    All,
54    /// Include only events that were buffered since the last update
55    IncrementalOnly,
56}
57
58impl Aggregator {
59    pub fn new(
60        shared: Arc<Shared>,
61        events: mpsc::Receiver<Event>,
62        cmds: mpsc::Receiver<Command>,
63    ) -> Self {
64        Self {
65            shared,
66            events,
67            cmds,
68            watchers: vec![],
69            logs: EventBuf::new(),
70            spans: EventBuf::new(),
71            all_metadata: vec![],
72            new_metadata: vec![],
73            base_time: TimeAnchor::new(),
74        }
75    }
76
77    pub async fn run(mut self, publish_interval: Duration) {
78        let mut interval = tokio::time::interval(publish_interval);
79
80        loop {
81            let should_publish = tokio::select! {
82                _ = interval.tick() => true,
83                () = self.shared.flush.notified() => {
84                    tracing::debug!("event buffer approaching capacity, flushing...");
85                    false
86                },
87                cmd = self.cmds.recv() => {
88                    if let Some(Command::Instrument(watcher)) = cmd {
89                        self.attach_watcher(watcher).await;
90                    } else {
91                        tracing::debug!("gRPC server closed, terminating...");
92                        // TODO set health status to NOT_SERVING?
93                        break;
94                    }
95
96                    false
97                }
98            };
99
100            while let Some(event) = self.events.recv().now_or_never() {
101                if let Some(event) = event {
102                    self.update_state(event);
103                } else {
104                    tracing::debug!("event channel closed; terminating");
105                    break;
106                }
107            }
108
109            if should_publish {
110                self.publish();
111            }
112        }
113
114        // attempt to flush updates to all watchers before closing
115        self.publish();
116    }
117
118    async fn attach_watcher(&mut self, watcher: Watcher) {
119        let now = Instant::now();
120
121        let log_update = self.log_update(Include::All);
122        let span_update = self.span_update(Include::All);
123
124        let update = instrument::Update {
125            at: Some(self.base_time.to_timestamp(now)),
126            new_metadata: self.all_metadata.clone(),
127            logs_update: Some(log_update),
128            spans_update: Some(span_update),
129        };
130
131        match watcher.tx.send(Ok(update)).await {
132            Ok(()) => {
133                self.watchers.push(watcher);
134            }
135            Err(err) => {
136                tracing::warn!("Failed to send initial update to client because of error {err:?}");
137            }
138        }
139    }
140
141    fn update_state(&mut self, event: Event) {
142        match event {
143            Event::Metadata(metadata) => {
144                self.all_metadata.push(metadata.into());
145                self.new_metadata.push(metadata.into());
146            }
147            Event::Event {
148                at,
149                metadata,
150                message,
151                fields,
152                maybe_parent,
153            } => {
154                self.logs.push_overwrite(LogEvent {
155                    at: Some(self.base_time.to_timestamp(at)),
156                    metadata_id: metadata as *const _ as u64,
157                    message,
158                    fields,
159                    parent: maybe_parent.map(|id| id.into_u64()),
160                });
161            }
162            Event::NewSpan {
163                at,
164                id,
165                metadata,
166                fields,
167                maybe_parent,
168            } => {
169                self.spans.push_overwrite(SpanEvent::new_span(
170                    self.base_time.to_timestamp(at),
171                    &id,
172                    metadata,
173                    fields,
174                    maybe_parent,
175                ));
176            }
177            Event::EnterSpan {
178                at,
179                span_id,
180                thread_id,
181            } => {
182                self.spans.push_overwrite(SpanEvent::enter_span(
183                    self.base_time.to_timestamp(at),
184                    &span_id,
185                    thread_id,
186                ));
187            }
188            Event::ExitSpan {
189                at,
190                span_id,
191                thread_id,
192            } => {
193                self.spans.push_overwrite(SpanEvent::exit_span(
194                    self.base_time.to_timestamp(at),
195                    &span_id,
196                    thread_id,
197                ));
198            }
199            Event::CloseSpan { at, span_id } => {
200                self.spans.push_overwrite(SpanEvent::close_span(
201                    self.base_time.to_timestamp(at),
202                    &span_id,
203                ));
204            }
205            Event::SpanRecorded { span_id, fields } => {
206                self.spans
207                    .push_overwrite(SpanEvent::span_recorded(&span_id, fields));
208            }
209        }
210    }
211
212    fn log_update(&mut self, include: Include) -> logs::Update {
213        let log_events = match include {
214            Include::All => self.logs.iter().cloned().collect(),
215            Include::IncrementalOnly => self.logs.take_unsent().cloned().collect(),
216        };
217
218        let dropped_events = match include {
219            Include::All => self.shared.dropped_log_events.load(Ordering::Acquire) as u64,
220            Include::IncrementalOnly => {
221                self.shared.dropped_log_events.swap(0, Ordering::AcqRel) as u64
222            }
223        };
224
225        logs::Update {
226            log_events,
227            dropped_events,
228        }
229    }
230
231    fn span_update(&mut self, include: Include) -> spans::Update {
232        let span_events = match include {
233            Include::All => self.spans.iter().cloned().collect(),
234            Include::IncrementalOnly => self.spans.take_unsent().cloned().collect(),
235        };
236
237        let dropped_events = match include {
238            Include::All => self.shared.dropped_span_events.load(Ordering::Acquire) as u64,
239            Include::IncrementalOnly => {
240                self.shared.dropped_span_events.swap(0, Ordering::AcqRel) as u64
241            }
242        };
243
244        spans::Update {
245            span_events,
246            dropped_events,
247        }
248    }
249
250    fn publish(&mut self) {
251        let now = Instant::now();
252
253        let new_metadata = mem::take(&mut self.new_metadata);
254        let log_update = self.log_update(Include::IncrementalOnly);
255        let span_update = self.span_update(Include::IncrementalOnly);
256
257        let update = instrument::Update {
258            at: Some(self.base_time.to_timestamp(now)),
259            new_metadata,
260            logs_update: Some(log_update),
261            spans_update: Some(span_update),
262        };
263
264        self.watchers
265            .retain(|w| w.tx.try_send(Ok(update.clone())).is_ok());
266    }
267}
268
269/// Used to convert `Instant`s to `SystemTime`s and `Timestamp`s
270pub struct TimeAnchor {
271    mono: Instant,
272    sys: SystemTime,
273}
274
275impl Default for TimeAnchor {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281impl TimeAnchor {
282    #[must_use]
283    pub fn new() -> Self {
284        Self {
285            mono: Instant::now(),
286            sys: SystemTime::now(),
287        }
288    }
289
290    #[must_use]
291    pub fn to_system_time(&self, t: Instant) -> SystemTime {
292        let dur = t
293            .checked_duration_since(self.mono)
294            .unwrap_or_else(|| Duration::from_secs(0));
295        self.sys + dur
296    }
297
298    #[must_use]
299    pub fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp {
300        self.to_system_time(t).into()
301    }
302}
303
304/// A fixed-size buffer for events
305///
306/// This is essentially a FIFO queue, that will discard the oldest item when full.
307///
308/// It also keeps a counter of how many events in the buffer were sent to clients.
309/// [`EventBuf::take_unsent`] will return all events that were not sent yet and reset the counter.
310struct EventBuf<T, const CAP: usize> {
311    inner: HeapRb<T>,
312    sent: usize,
313}
314
315impl<T, const CAP: usize> EventBuf<T, CAP> {
316    pub fn new() -> Self {
317        Self {
318            inner: HeapRb::new(CAP),
319            sent: 0,
320        }
321    }
322
323    /// Push an event into the buffer, overwriting the oldest event if the buffer is full.
324    pub fn push_overwrite(&mut self, item: T) {
325        if self.inner.push_overwrite(item).is_some() {
326            self.sent = self.sent.saturating_sub(1);
327        }
328    }
329
330    /// Returns an iterator over all events that were not sent yet.
331    pub fn take_unsent(&mut self) -> impl Iterator<Item = &T> {
332        let iter = self.inner.iter().skip(self.sent);
333        self.sent = self.inner.occupied_len();
334        iter
335    }
336
337    /// Returns an iterator over all events in the buffer.
338    pub fn iter(&self) -> impl Iterator<Item = &T> {
339        self.inner.iter()
340    }
341}
342
343#[cfg(test)]
344mod test {
345    use super::*;
346    use crate::layer::Layer;
347    use devtools_wire_format::instrument::Update;
348    use tracing_subscriber::prelude::*;
349
350    #[test]
351    fn ringbuf() {
352        let mut buf: EventBuf<u8, 5> = EventBuf::new();
353
354        buf.push_overwrite(1);
355        let one: Vec<_> = buf.take_unsent().copied().collect();
356
357        buf.push_overwrite(2);
358        buf.push_overwrite(3);
359        buf.push_overwrite(4);
360        let two: Vec<_> = buf.take_unsent().copied().collect();
361
362        buf.push_overwrite(5);
363        buf.push_overwrite(6);
364        let three: Vec<_> = buf.take_unsent().copied().collect();
365
366        assert_eq!(one, [1]);
367        assert_eq!(two, [2, 3, 4]);
368        assert_eq!(three, [5, 6]);
369    }
370
371    async fn drain_updates(mf: Aggregator, cmd_tx: mpsc::Sender<Command>) -> Vec<Update> {
372        let (client_tx, mut client_rx) = mpsc::channel(1);
373        cmd_tx
374            .send(Command::Instrument(Watcher { tx: client_tx }))
375            .await
376            .unwrap();
377        drop(cmd_tx);
378
379        mf.run(Duration::from_millis(10)).await; // run the aggregators event loop to completion
380
381        let mut out = Vec::new();
382        while let Some(Ok(update)) = client_rx.recv().await {
383            out.push(update);
384        }
385        out
386    }
387
388    #[tokio::test]
389    async fn initial_update() {
390        let (_, evt_rx) = mpsc::channel(1);
391        let (cmd_tx, cmd_rx) = mpsc::channel(1);
392
393        let mf = Aggregator::new(Default::default(), evt_rx, cmd_rx);
394
395        let (client_tx, mut client_rx) = mpsc::channel(1);
396        cmd_tx
397            .send(Command::Instrument(Watcher { tx: client_tx }))
398            .await
399            .unwrap();
400        drop(cmd_tx); // drop the cmd_tx connection here, this will stop the aggregator
401
402        let (maybe_update, _) = futures::join!(client_rx.recv(), mf.run(Duration::from_millis(10)));
403        let update = maybe_update.unwrap().unwrap();
404        assert_eq!(update.logs_update.unwrap().log_events.len(), 0);
405        assert_eq!(update.spans_update.unwrap().span_events.len(), 0);
406        assert_eq!(update.new_metadata.len(), 0);
407    }
408
409    #[tokio::test]
410    async fn log_events() {
411        let shared = Arc::new(Shared::default());
412        let (evt_tx, evt_rx) = mpsc::channel(1);
413        let (cmd_tx, cmd_rx) = mpsc::channel(1);
414
415        let layer = Layer::new(shared.clone(), evt_tx);
416        let mf = Aggregator::new(shared, evt_rx, cmd_rx);
417
418        tracing_subscriber::registry().with(layer).set_default();
419
420        tracing::debug!("an event!");
421
422        let updates = drain_updates(mf, cmd_tx).await;
423        assert_eq!(updates.len(), 1);
424    }
425}