1use crate::{Command, Event, Shared, Watcher};
2use devtools_wire_format::logs::LogEvent;
3use devtools_wire_format::spans::SpanEvent;
4use devtools_wire_format::{instrument, logs, spans, NewMetadata};
5use futures::FutureExt;
6use ringbuf::consumer::Consumer;
7use ringbuf::traits::{Observer, RingBuffer};
8use ringbuf::HeapRb;
9use std::mem;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime};
13use tokio::sync::mpsc;
14
15pub struct Aggregator {
21 shared: Arc<Shared>,
23 events: mpsc::Receiver<Event>,
25 cmds: mpsc::Receiver<Command>,
27
28 all_metadata: Vec<NewMetadata>,
31 new_metadata: Vec<NewMetadata>,
34
35 logs: EventBuf<LogEvent, 512>,
38 spans: EventBuf<SpanEvent, 512>,
41
42 watchers: Vec<Watcher>,
44
45 pub(crate) base_time: TimeAnchor,
47}
48
49#[derive(Debug, Copy, Clone)]
51enum Include {
52 All,
54 IncrementalOnly,
56}
57
58impl Aggregator {
59 pub fn new(
60 shared: Arc<Shared>,
61 events: mpsc::Receiver<Event>,
62 cmds: mpsc::Receiver<Command>,
63 ) -> Self {
64 Self {
65 shared,
66 events,
67 cmds,
68 watchers: vec![],
69 logs: EventBuf::new(),
70 spans: EventBuf::new(),
71 all_metadata: vec![],
72 new_metadata: vec![],
73 base_time: TimeAnchor::new(),
74 }
75 }
76
77 pub async fn run(mut self, publish_interval: Duration) {
78 let mut interval = tokio::time::interval(publish_interval);
79
80 loop {
81 let should_publish = tokio::select! {
82 _ = interval.tick() => true,
83 () = self.shared.flush.notified() => {
84 tracing::debug!("event buffer approaching capacity, flushing...");
85 false
86 },
87 cmd = self.cmds.recv() => {
88 if let Some(Command::Instrument(watcher)) = cmd {
89 self.attach_watcher(watcher).await;
90 } else {
91 tracing::debug!("gRPC server closed, terminating...");
92 break;
94 }
95
96 false
97 }
98 };
99
100 while let Some(event) = self.events.recv().now_or_never() {
101 if let Some(event) = event {
102 self.update_state(event);
103 } else {
104 tracing::debug!("event channel closed; terminating");
105 break;
106 }
107 }
108
109 if should_publish {
110 self.publish();
111 }
112 }
113
114 self.publish();
116 }
117
118 async fn attach_watcher(&mut self, watcher: Watcher) {
119 let now = Instant::now();
120
121 let log_update = self.log_update(Include::All);
122 let span_update = self.span_update(Include::All);
123
124 let update = instrument::Update {
125 at: Some(self.base_time.to_timestamp(now)),
126 new_metadata: self.all_metadata.clone(),
127 logs_update: Some(log_update),
128 spans_update: Some(span_update),
129 };
130
131 match watcher.tx.send(Ok(update)).await {
132 Ok(()) => {
133 self.watchers.push(watcher);
134 }
135 Err(err) => {
136 tracing::warn!("Failed to send initial update to client because of error {err:?}");
137 }
138 }
139 }
140
141 fn update_state(&mut self, event: Event) {
142 match event {
143 Event::Metadata(metadata) => {
144 self.all_metadata.push(metadata.into());
145 self.new_metadata.push(metadata.into());
146 }
147 Event::Event {
148 at,
149 metadata,
150 message,
151 fields,
152 maybe_parent,
153 } => {
154 self.logs.push_overwrite(LogEvent {
155 at: Some(self.base_time.to_timestamp(at)),
156 metadata_id: metadata as *const _ as u64,
157 message,
158 fields,
159 parent: maybe_parent.map(|id| id.into_u64()),
160 });
161 }
162 Event::NewSpan {
163 at,
164 id,
165 metadata,
166 fields,
167 maybe_parent,
168 } => {
169 self.spans.push_overwrite(SpanEvent::new_span(
170 self.base_time.to_timestamp(at),
171 &id,
172 metadata,
173 fields,
174 maybe_parent,
175 ));
176 }
177 Event::EnterSpan {
178 at,
179 span_id,
180 thread_id,
181 } => {
182 self.spans.push_overwrite(SpanEvent::enter_span(
183 self.base_time.to_timestamp(at),
184 &span_id,
185 thread_id,
186 ));
187 }
188 Event::ExitSpan {
189 at,
190 span_id,
191 thread_id,
192 } => {
193 self.spans.push_overwrite(SpanEvent::exit_span(
194 self.base_time.to_timestamp(at),
195 &span_id,
196 thread_id,
197 ));
198 }
199 Event::CloseSpan { at, span_id } => {
200 self.spans.push_overwrite(SpanEvent::close_span(
201 self.base_time.to_timestamp(at),
202 &span_id,
203 ));
204 }
205 Event::SpanRecorded { span_id, fields } => {
206 self.spans
207 .push_overwrite(SpanEvent::span_recorded(&span_id, fields));
208 }
209 }
210 }
211
212 fn log_update(&mut self, include: Include) -> logs::Update {
213 let log_events = match include {
214 Include::All => self.logs.iter().cloned().collect(),
215 Include::IncrementalOnly => self.logs.take_unsent().cloned().collect(),
216 };
217
218 let dropped_events = match include {
219 Include::All => self.shared.dropped_log_events.load(Ordering::Acquire) as u64,
220 Include::IncrementalOnly => {
221 self.shared.dropped_log_events.swap(0, Ordering::AcqRel) as u64
222 }
223 };
224
225 logs::Update {
226 log_events,
227 dropped_events,
228 }
229 }
230
231 fn span_update(&mut self, include: Include) -> spans::Update {
232 let span_events = match include {
233 Include::All => self.spans.iter().cloned().collect(),
234 Include::IncrementalOnly => self.spans.take_unsent().cloned().collect(),
235 };
236
237 let dropped_events = match include {
238 Include::All => self.shared.dropped_span_events.load(Ordering::Acquire) as u64,
239 Include::IncrementalOnly => {
240 self.shared.dropped_span_events.swap(0, Ordering::AcqRel) as u64
241 }
242 };
243
244 spans::Update {
245 span_events,
246 dropped_events,
247 }
248 }
249
250 fn publish(&mut self) {
251 let now = Instant::now();
252
253 let new_metadata = mem::take(&mut self.new_metadata);
254 let log_update = self.log_update(Include::IncrementalOnly);
255 let span_update = self.span_update(Include::IncrementalOnly);
256
257 let update = instrument::Update {
258 at: Some(self.base_time.to_timestamp(now)),
259 new_metadata,
260 logs_update: Some(log_update),
261 spans_update: Some(span_update),
262 };
263
264 self.watchers
265 .retain(|w| w.tx.try_send(Ok(update.clone())).is_ok());
266 }
267}
268
269pub struct TimeAnchor {
271 mono: Instant,
272 sys: SystemTime,
273}
274
275impl Default for TimeAnchor {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281impl TimeAnchor {
282 #[must_use]
283 pub fn new() -> Self {
284 Self {
285 mono: Instant::now(),
286 sys: SystemTime::now(),
287 }
288 }
289
290 #[must_use]
291 pub fn to_system_time(&self, t: Instant) -> SystemTime {
292 let dur = t
293 .checked_duration_since(self.mono)
294 .unwrap_or_else(|| Duration::from_secs(0));
295 self.sys + dur
296 }
297
298 #[must_use]
299 pub fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp {
300 self.to_system_time(t).into()
301 }
302}
303
304struct EventBuf<T, const CAP: usize> {
311 inner: HeapRb<T>,
312 sent: usize,
313}
314
315impl<T, const CAP: usize> EventBuf<T, CAP> {
316 pub fn new() -> Self {
317 Self {
318 inner: HeapRb::new(CAP),
319 sent: 0,
320 }
321 }
322
323 pub fn push_overwrite(&mut self, item: T) {
325 if self.inner.push_overwrite(item).is_some() {
326 self.sent = self.sent.saturating_sub(1);
327 }
328 }
329
330 pub fn take_unsent(&mut self) -> impl Iterator<Item = &T> {
332 let iter = self.inner.iter().skip(self.sent);
333 self.sent = self.inner.occupied_len();
334 iter
335 }
336
337 pub fn iter(&self) -> impl Iterator<Item = &T> {
339 self.inner.iter()
340 }
341}
342
343#[cfg(test)]
344mod test {
345 use super::*;
346 use crate::layer::Layer;
347 use devtools_wire_format::instrument::Update;
348 use tracing_subscriber::prelude::*;
349
350 #[test]
351 fn ringbuf() {
352 let mut buf: EventBuf<u8, 5> = EventBuf::new();
353
354 buf.push_overwrite(1);
355 let one: Vec<_> = buf.take_unsent().copied().collect();
356
357 buf.push_overwrite(2);
358 buf.push_overwrite(3);
359 buf.push_overwrite(4);
360 let two: Vec<_> = buf.take_unsent().copied().collect();
361
362 buf.push_overwrite(5);
363 buf.push_overwrite(6);
364 let three: Vec<_> = buf.take_unsent().copied().collect();
365
366 assert_eq!(one, [1]);
367 assert_eq!(two, [2, 3, 4]);
368 assert_eq!(three, [5, 6]);
369 }
370
371 async fn drain_updates(mf: Aggregator, cmd_tx: mpsc::Sender<Command>) -> Vec<Update> {
372 let (client_tx, mut client_rx) = mpsc::channel(1);
373 cmd_tx
374 .send(Command::Instrument(Watcher { tx: client_tx }))
375 .await
376 .unwrap();
377 drop(cmd_tx);
378
379 mf.run(Duration::from_millis(10)).await; let mut out = Vec::new();
382 while let Some(Ok(update)) = client_rx.recv().await {
383 out.push(update);
384 }
385 out
386 }
387
388 #[tokio::test]
389 async fn initial_update() {
390 let (_, evt_rx) = mpsc::channel(1);
391 let (cmd_tx, cmd_rx) = mpsc::channel(1);
392
393 let mf = Aggregator::new(Default::default(), evt_rx, cmd_rx);
394
395 let (client_tx, mut client_rx) = mpsc::channel(1);
396 cmd_tx
397 .send(Command::Instrument(Watcher { tx: client_tx }))
398 .await
399 .unwrap();
400 drop(cmd_tx); let (maybe_update, _) = futures::join!(client_rx.recv(), mf.run(Duration::from_millis(10)));
403 let update = maybe_update.unwrap().unwrap();
404 assert_eq!(update.logs_update.unwrap().log_events.len(), 0);
405 assert_eq!(update.spans_update.unwrap().span_events.len(), 0);
406 assert_eq!(update.new_metadata.len(), 0);
407 }
408
409 #[tokio::test]
410 async fn log_events() {
411 let shared = Arc::new(Shared::default());
412 let (evt_tx, evt_rx) = mpsc::channel(1);
413 let (cmd_tx, cmd_rx) = mpsc::channel(1);
414
415 let layer = Layer::new(shared.clone(), evt_tx);
416 let mf = Aggregator::new(shared, evt_rx, cmd_rx);
417
418 tracing_subscriber::registry().with(layer).set_default();
419
420 tracing::debug!("an event!");
421
422 let updates = drain_updates(mf, cmd_tx).await;
423 assert_eq!(updates.len(), 1);
424 }
425}