console_subscriber/aggregator/
mod.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering::*},
4        Arc,
5    },
6    time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17    stats::{self, Unsent},
18    ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
27const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29/// Aggregates instrumentation traces and prepares state for the instrument
30/// server.
31///
32/// The `Aggregator` is responsible for receiving and organizing the
33/// instrumentated events and preparing the data to be served to a instrument
34/// client.
35pub struct Aggregator {
36    /// Channel of incoming events emitted by `TaskLayer`s.
37    events: mpsc::Receiver<Event>,
38
39    /// New incoming RPCs.
40    rpcs: mpsc::Receiver<Command>,
41
42    /// The interval at which new data updates are pushed to clients.
43    publish_interval: Duration,
44
45    /// How long to keep task data after a task has completed.
46    retention: Duration,
47
48    /// Shared state, including a `Notify` that triggers a flush when the event
49    /// buffer is approaching capacity.
50    shared: Arc<Shared>,
51
52    /// Currently active RPCs streaming state events.
53    state_watchers: ShrinkVec<Watch<proto::instrument::State>>,
54
55    /// Currently active RPCs streaming task events.
56    watchers: ShrinkVec<Watch<proto::instrument::Update>>,
57
58    /// Currently active RPCs streaming task details events, by task ID.
59    details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
60
61    /// *All* metadata for task spans and user-defined spans that we care about.
62    ///
63    /// This is sent to new clients as part of the initial state.
64    all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
65
66    /// *New* metadata that was registered since the last state update.
67    ///
68    /// This is emptied on every state update.
69    new_metadata: Vec<proto::register_metadata::NewMetadata>,
70
71    /// Map of task IDs to task static data.
72    tasks: IdData<Task>,
73
74    /// Map of task IDs to task stats.
75    task_stats: IdData<Arc<stats::TaskStats>>,
76
77    /// Map of resource IDs to resource static data.
78    resources: IdData<Resource>,
79
80    /// Map of resource IDs to resource stats.
81    resource_stats: IdData<Arc<stats::ResourceStats>>,
82
83    /// Map of AsyncOp IDs to AsyncOp static data.
84    async_ops: IdData<AsyncOp>,
85
86    /// Map of AsyncOp IDs to AsyncOp stats.
87    async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
88
89    /// `PollOp `events that have occurred since the last update
90    ///
91    /// This is emptied on every state update.
92    poll_ops: Vec<proto::resources::PollOp>,
93
94    /// The time "state" of the aggregator, such as paused or live.
95    temporality: proto::instrument::Temporality,
96
97    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
98    /// timestamp that can be sent over the wire.
99    base_time: stats::TimeAnchor,
100}
101
102#[derive(Debug, Default)]
103pub(crate) struct Flush {
104    pub(crate) should_flush: Notify,
105    triggered: AtomicBool,
106}
107
108// Represent static data for resources
109struct Resource {
110    id: Id,
111    is_dirty: AtomicBool,
112    parent_id: Option<Id>,
113    metadata: &'static Metadata<'static>,
114    concrete_type: String,
115    kind: resource::Kind,
116    location: Option<proto::Location>,
117    is_internal: bool,
118}
119
120/// Represents static data for tasks
121struct Task {
122    id: Id,
123    is_dirty: AtomicBool,
124    metadata: &'static Metadata<'static>,
125    fields: Vec<proto::Field>,
126    location: Option<proto::Location>,
127}
128
129struct AsyncOp {
130    id: Id,
131    is_dirty: AtomicBool,
132    parent_id: Option<Id>,
133    resource_id: Id,
134    metadata: &'static Metadata<'static>,
135    source: String,
136}
137
138impl Aggregator {
139    pub(crate) fn new(
140        events: mpsc::Receiver<Event>,
141        rpcs: mpsc::Receiver<Command>,
142        builder: &crate::Builder,
143        shared: Arc<crate::Shared>,
144        base_time: stats::TimeAnchor,
145    ) -> Self {
146        Self {
147            shared,
148            rpcs,
149            publish_interval: builder.publish_interval,
150            retention: builder.retention,
151            events,
152            watchers: Default::default(),
153            details_watchers: Default::default(),
154            state_watchers: Default::default(),
155            all_metadata: Default::default(),
156            new_metadata: Default::default(),
157            tasks: IdData::default(),
158            task_stats: IdData::default(),
159            resources: IdData::default(),
160            resource_stats: IdData::default(),
161            async_ops: IdData::default(),
162            async_op_stats: IdData::default(),
163            poll_ops: Default::default(),
164            temporality: proto::instrument::Temporality::Live,
165            base_time,
166        }
167    }
168
169    /// Runs the aggregator.
170    ///
171    /// This method will start the aggregator loop and should run as long as
172    /// the instrument server is running. If the instrument server stops,
173    /// this future can be aborted.
174    pub async fn run(mut self) {
175        let mut publish = tokio::time::interval(self.publish_interval);
176        loop {
177            let should_send = tokio::select! {
178                // if the flush interval elapses, flush data to the client
179                _ = publish.tick() => {
180                    match self.temporality {
181                        proto::instrument::Temporality::Live => true,
182                        proto::instrument::Temporality::Paused => false,
183                    }
184                }
185
186                // triggered when the event buffer is approaching capacity
187                _ = self.shared.flush.should_flush.notified() => {
188                    tracing::debug!("approaching capacity; draining buffer");
189                    false
190                }
191
192                // a new command from a client
193                cmd = self.rpcs.recv() => {
194                    match cmd {
195                        Some(Command::Instrument(subscription)) => {
196                            self.add_instrument_subscription(subscription);
197                        },
198                        Some(Command::WatchTaskDetail(watch_request)) => {
199                            self.add_task_detail_subscription(watch_request);
200                        },
201                        Some(Command::WatchState(subscription)) => {
202                            self.add_state_subscription(subscription);
203                        }
204                        Some(Command::Pause) => {
205                            self.temporality = proto::instrument::Temporality::Paused;
206                        }
207                        Some(Command::Resume) => {
208                            self.temporality = proto::instrument::Temporality::Live;
209                        }
210                        None => {
211                            tracing::debug!("rpc channel closed, terminating");
212                            return;
213                        }
214                    };
215
216                    false
217                }
218            };
219
220            // drain and aggregate buffered events.
221            //
222            // Note: we *don't* want to actually await the call to `recv` --- we
223            // don't want the aggregator task to be woken on every event,
224            // because it will then be woken when its own `poll` calls are
225            // exited. that would result in a busy-loop. instead, we only want
226            // to be woken when the flush interval has elapsed, or when the
227            // channel is almost full.
228            let mut drained = false;
229            let mut counts = EventCounts::new();
230            while let Some(event) = recv_now_or_never(&mut self.events) {
231                match event {
232                    Some(event) => {
233                        counts.update(&event);
234                        self.update_state(event);
235                        drained = true;
236                    }
237                    // The channel closed, no more events will be emitted...time
238                    // to stop aggregating.
239                    None => {
240                        tracing::debug!("event channel closed; terminating");
241                        return;
242                    }
243                };
244            }
245            tracing::debug!(
246                async_resource_ops = counts.async_resource_op,
247                metadatas = counts.metadata,
248                poll_ops = counts.poll_op,
249                resources = counts.resource,
250                spawns = counts.spawn,
251                total = counts.total(),
252                "event channel drain loop",
253            );
254
255            if !self.state_watchers.is_empty() {
256                self.publish_state();
257            }
258
259            // flush data to clients, if there are any currently subscribed
260            // watchers and we should send a new update.
261            if !self.watchers.is_empty() && should_send {
262                self.publish();
263            }
264            self.cleanup_closed();
265            if drained {
266                self.shared.flush.has_flushed();
267            }
268        }
269    }
270
271    fn cleanup_closed(&mut self) {
272        // drop all closed have that has completed *and* whose final data has already
273        // been sent off.
274        let now = Instant::now();
275        let has_watchers = !self.watchers.is_empty();
276        self.tasks
277            .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
278        self.resources
279            .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
280        self.async_ops
281            .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
282        if !has_watchers {
283            self.poll_ops.clear();
284        }
285    }
286
287    /// Add the task subscription to the watchers after sending the first update
288    fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
289        tracing::debug!("new instrument subscription");
290        let now = Instant::now();
291
292        let update = loop {
293            let update = proto::instrument::Update {
294                task_update: Some(self.task_update(Include::All)),
295                resource_update: Some(self.resource_update(Include::All)),
296                async_op_update: Some(self.async_op_update(Include::All)),
297                now: Some(self.base_time.to_timestamp(now)),
298                new_metadata: Some(proto::RegisterMetadata {
299                    metadata: (*self.all_metadata).clone(),
300                }),
301            };
302            let message_size = update.encoded_len();
303            if message_size < MAX_MESSAGE_SIZE {
304                // normal case
305                break Some(update);
306            }
307            // If the grpc message is bigger than tokio-console will accept, throw away the oldest
308            // inactive data and try again
309            self.retention /= 2;
310            self.cleanup_closed();
311            tracing::debug!(
312                retention = ?self.retention,
313                message_size,
314                max_message_size = MAX_MESSAGE_SIZE,
315                "Message too big, reduced retention",
316            );
317
318            if self.retention <= self.publish_interval {
319                self.retention = self.publish_interval;
320                break None;
321            }
322        };
323
324        match update {
325            // Send the initial state
326            Some(update) => {
327                if !subscription.update(&update) {
328                    // If sending the initial update fails, the subscription is already dead,
329                    // so don't add it to `watchers`.
330                    return;
331                }
332            }
333            // User will only get updates.
334            None => tracing::error!(
335                min_retention = ?self.publish_interval,
336                "Message too big. Start with smaller retention.",
337            ),
338        }
339
340        self.watchers.push(subscription);
341    }
342
343    fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
344        proto::tasks::TaskUpdate {
345            new_tasks: self.tasks.as_proto_list(include, &self.base_time),
346            stats_update: self.task_stats.as_proto(include, &self.base_time),
347            dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
348        }
349    }
350
351    fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
352        proto::resources::ResourceUpdate {
353            new_resources: self.resources.as_proto_list(include, &self.base_time),
354            stats_update: self.resource_stats.as_proto(include, &self.base_time),
355            new_poll_ops: std::mem::take(&mut self.poll_ops),
356            dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
357        }
358    }
359
360    fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
361        proto::async_ops::AsyncOpUpdate {
362            new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
363            stats_update: self.async_op_stats.as_proto(include, &self.base_time),
364            dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
365        }
366    }
367
368    /// Add the task details subscription to the watchers after sending the first update,
369    /// if the task is found.
370    fn add_task_detail_subscription(
371        &mut self,
372        watch_request: WatchRequest<proto::tasks::TaskDetails>,
373    ) {
374        let WatchRequest {
375            id,
376            stream_sender,
377            buffer,
378        } = watch_request;
379        tracing::debug!(id = ?id, "new task details subscription");
380        if let Some(stats) = self.task_stats.get(&id) {
381            let (tx, rx) = mpsc::channel(buffer);
382            let subscription = Watch(tx);
383            let now = Some(self.base_time.to_timestamp(Instant::now()));
384            // Send back the stream receiver.
385            // Then send the initial state --- if this fails, the subscription is already dead.
386            if stream_sender.send(rx).is_ok()
387                && subscription.update(&proto::tasks::TaskDetails {
388                    task_id: Some(id.clone().into()),
389                    now,
390                    poll_times_histogram: Some(stats.poll_duration_histogram()),
391                    scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
392                })
393            {
394                self.details_watchers
395                    .entry(id.clone())
396                    .or_default()
397                    .push(subscription);
398            }
399        }
400        // If the task is not found, drop `stream_sender` which will result in a not found error
401    }
402
403    /// Add a state subscription to the watchers.
404    fn add_state_subscription(&mut self, subscription: Watch<proto::instrument::State>) {
405        self.state_watchers.push(subscription);
406    }
407
408    /// Publish the current state to all active state watchers.
409    fn publish_state(&mut self) {
410        let state = proto::instrument::State {
411            temporality: self.temporality.into(),
412        };
413        self.state_watchers
414            .retain_and_shrink(|watch| watch.update(&state));
415    }
416
417    /// Publish the current state to all active watchers.
418    ///
419    /// This drops any watchers which have closed the RPC, or whose update
420    /// channel has filled up.
421    fn publish(&mut self) {
422        let new_metadata = if !self.new_metadata.is_empty() {
423            Some(proto::RegisterMetadata {
424                metadata: std::mem::take(&mut self.new_metadata),
425            })
426        } else {
427            None
428        };
429        let task_update = Some(self.task_update(Include::UpdatedOnly));
430        let resource_update = Some(self.resource_update(Include::UpdatedOnly));
431        let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
432
433        let update = proto::instrument::Update {
434            now: Some(self.base_time.to_timestamp(Instant::now())),
435            new_metadata,
436            task_update,
437            resource_update,
438            async_op_update,
439        };
440
441        self.watchers
442            .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
443
444        let stats = &self.task_stats;
445        // Assuming there are much fewer task details subscribers than there are
446        // stats updates, iterate over `details_watchers` and compact the map.
447        self.details_watchers.retain_and_shrink(|id, watchers| {
448            if let Some(task_stats) = stats.get(id) {
449                let details = proto::tasks::TaskDetails {
450                    task_id: Some(id.clone().into()),
451                    now: Some(self.base_time.to_timestamp(Instant::now())),
452                    poll_times_histogram: Some(task_stats.poll_duration_histogram()),
453                    scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
454                };
455                watchers.retain(|watch| watch.update(&details));
456                !watchers.is_empty()
457            } else {
458                false
459            }
460        });
461    }
462
463    /// Update the current state with data from a single event.
464    fn update_state(&mut self, event: Event) {
465        // do state update
466        match event {
467            Event::Metadata(meta) => {
468                self.all_metadata.push(meta.into());
469                self.new_metadata.push(meta.into());
470            }
471
472            Event::Spawn {
473                id,
474                metadata,
475                stats,
476                fields,
477                location,
478            } => {
479                self.tasks.insert(
480                    id.clone(),
481                    Task {
482                        id: id.clone(),
483                        is_dirty: AtomicBool::new(true),
484                        metadata,
485                        fields,
486                        location,
487                        // TODO: parents
488                    },
489                );
490
491                self.task_stats.insert(id, stats);
492            }
493
494            Event::Resource {
495                id,
496                parent_id,
497                metadata,
498                kind,
499                concrete_type,
500                location,
501                is_internal,
502                stats,
503            } => {
504                self.resources.insert(
505                    id.clone(),
506                    Resource {
507                        id: id.clone(),
508                        is_dirty: AtomicBool::new(true),
509                        parent_id,
510                        kind,
511                        metadata,
512                        concrete_type,
513                        location,
514                        is_internal,
515                    },
516                );
517
518                self.resource_stats.insert(id, stats);
519            }
520
521            Event::PollOp {
522                metadata,
523                resource_id,
524                op_name,
525                async_op_id,
526                task_id,
527                is_ready,
528            } => {
529                // CLI doesn't show historical poll ops, so don't save them if no-one is watching
530                if self.watchers.is_empty() {
531                    return;
532                }
533                let poll_op = proto::resources::PollOp {
534                    metadata: Some(metadata.into()),
535                    resource_id: Some(resource_id.into()),
536                    name: op_name,
537                    task_id: Some(task_id.into()),
538                    async_op_id: Some(async_op_id.into()),
539                    is_ready,
540                };
541
542                self.poll_ops.push(poll_op);
543            }
544
545            Event::AsyncResourceOp {
546                id,
547                source,
548                resource_id,
549                metadata,
550                parent_id,
551                stats,
552            } => {
553                self.async_ops.insert(
554                    id.clone(),
555                    AsyncOp {
556                        id: id.clone(),
557                        is_dirty: AtomicBool::new(true),
558                        resource_id,
559                        metadata,
560                        source,
561                        parent_id,
562                    },
563                );
564
565                self.async_op_stats.insert(id, stats);
566            }
567        }
568    }
569}
570
571fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
572    let waker = futures_task::noop_waker();
573    let mut cx = std::task::Context::from_waker(&waker);
574
575    match receiver.poll_recv(&mut cx) {
576        std::task::Poll::Ready(opt) => Some(opt),
577        std::task::Poll::Pending => None,
578    }
579}
580
581/// Count of events received in each aggregator drain cycle.
582struct EventCounts {
583    async_resource_op: usize,
584    metadata: usize,
585    poll_op: usize,
586    resource: usize,
587    spawn: usize,
588}
589
590impl EventCounts {
591    fn new() -> Self {
592        Self {
593            async_resource_op: 0,
594            metadata: 0,
595            poll_op: 0,
596            resource: 0,
597            spawn: 0,
598        }
599    }
600
601    /// Count the event based on its variant.
602    fn update(&mut self, event: &Event) {
603        match event {
604            Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
605            Event::Metadata(_) => self.metadata += 1,
606            Event::PollOp { .. } => self.poll_op += 1,
607            Event::Resource { .. } => self.resource += 1,
608            Event::Spawn { .. } => self.spawn += 1,
609        }
610    }
611
612    /// Total number of events recorded.
613    fn total(&self) -> usize {
614        self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
615    }
616}
617
618// ==== impl Flush ===
619
620impl Flush {
621    pub(crate) fn trigger(&self) {
622        if self
623            .triggered
624            .compare_exchange(false, true, AcqRel, Acquire)
625            .is_ok()
626        {
627            self.should_flush.notify_one();
628        } else {
629            // someone else already did it, that's fine...
630        }
631    }
632
633    /// Indicates that the buffer has been successfully flushed.
634    fn has_flushed(&self) {
635        let _ = self
636            .triggered
637            .compare_exchange(true, false, AcqRel, Acquire);
638    }
639}
640
641impl<T: Clone> Watch<T> {
642    fn update(&self, update: &T) -> bool {
643        if let Ok(reserve) = self.0.try_reserve() {
644            reserve.send(Ok(update.clone()));
645            true
646        } else {
647            false
648        }
649    }
650}
651
652impl ToProto for Task {
653    type Output = proto::tasks::Task;
654
655    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
656        proto::tasks::Task {
657            id: Some(self.id.clone().into()),
658            // TODO: more kinds of tasks...
659            kind: proto::tasks::task::Kind::Spawn as i32,
660            metadata: Some(self.metadata.into()),
661            parents: Vec::new(), // TODO: implement parents nicely
662            fields: self.fields.clone(),
663            location: self.location.clone(),
664        }
665    }
666}
667
668impl Unsent for Task {
669    fn take_unsent(&self) -> bool {
670        self.is_dirty.swap(false, AcqRel)
671    }
672
673    fn is_unsent(&self) -> bool {
674        self.is_dirty.load(Acquire)
675    }
676}
677
678impl ToProto for Resource {
679    type Output = proto::resources::Resource;
680
681    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
682        proto::resources::Resource {
683            id: Some(self.id.clone().into()),
684            parent_resource_id: self.parent_id.clone().map(Into::into),
685            kind: Some(self.kind.clone()),
686            metadata: Some(self.metadata.into()),
687            concrete_type: self.concrete_type.clone(),
688            location: self.location.clone(),
689            is_internal: self.is_internal,
690        }
691    }
692}
693
694impl Unsent for Resource {
695    fn take_unsent(&self) -> bool {
696        self.is_dirty.swap(false, AcqRel)
697    }
698
699    fn is_unsent(&self) -> bool {
700        self.is_dirty.load(Acquire)
701    }
702}
703
704impl ToProto for AsyncOp {
705    type Output = proto::async_ops::AsyncOp;
706
707    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
708        proto::async_ops::AsyncOp {
709            id: Some(self.id.clone().into()),
710            metadata: Some(self.metadata.into()),
711            resource_id: Some(self.resource_id.clone().into()),
712            source: self.source.clone(),
713            parent_async_op_id: self.parent_id.clone().map(Into::into),
714        }
715    }
716}
717
718impl Unsent for AsyncOp {
719    fn take_unsent(&self) -> bool {
720        self.is_dirty.swap(false, AcqRel)
721    }
722
723    fn is_unsent(&self) -> bool {
724        self.is_dirty.load(Acquire)
725    }
726}