hds_console_subscriber/aggregator/
mod.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering::*},
4        Arc,
5    },
6    time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17    stats::{self, Unsent},
18    ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
27const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29/// Aggregates instrumentation traces and prepares state for the instrument
30/// server.
31///
32/// The `Aggregator` is responsible for receiving and organizing the
33/// instrumentated events and preparing the data to be served to a instrument
34/// client.
35pub struct Aggregator {
36    /// Channel of incoming events emitted by `TaskLayer`s.
37    events: mpsc::Receiver<Event>,
38
39    /// New incoming RPCs.
40    rpcs: mpsc::Receiver<Command>,
41
42    /// The interval at which new data updates are pushed to clients.
43    publish_interval: Duration,
44
45    /// How long to keep task data after a task has completed.
46    retention: Duration,
47
48    /// Shared state, including a `Notify` that triggers a flush when the event
49    /// buffer is approaching capacity.
50    shared: Arc<Shared>,
51
52    /// Currently active RPCs streaming task events.
53    watchers: ShrinkVec<Watch<proto::instrument::Update>>,
54
55    /// Currently active RPCs streaming task details events, by task ID.
56    details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
57
58    /// *All* metadata for task spans and user-defined spans that we care about.
59    ///
60    /// This is sent to new clients as part of the initial state.
61    all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
62
63    /// *New* metadata that was registered since the last state update.
64    ///
65    /// This is emptied on every state update.
66    new_metadata: Vec<proto::register_metadata::NewMetadata>,
67
68    /// Map of task IDs to task static data.
69    tasks: IdData<Task>,
70
71    /// Map of task IDs to task stats.
72    task_stats: IdData<Arc<stats::TaskStats>>,
73
74    /// Map of resource IDs to resource static data.
75    resources: IdData<Resource>,
76
77    /// Map of resource IDs to resource stats.
78    resource_stats: IdData<Arc<stats::ResourceStats>>,
79
80    /// Map of AsyncOp IDs to AsyncOp static data.
81    async_ops: IdData<AsyncOp>,
82
83    /// Map of AsyncOp IDs to AsyncOp stats.
84    async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
85
86    /// `PollOp `events that have occurred since the last update
87    ///
88    /// This is emptied on every state update.
89    poll_ops: Vec<proto::resources::PollOp>,
90
91    /// The time "state" of the aggregator, such as paused or live.
92    temporality: Temporality,
93
94    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
95    /// timestamp that can be sent over the wire.
96    base_time: stats::TimeAnchor,
97}
98
99#[derive(Debug, Default)]
100pub(crate) struct Flush {
101    pub(crate) should_flush: Notify,
102    triggered: AtomicBool,
103}
104
105#[derive(Debug)]
106enum Temporality {
107    Live,
108    Paused,
109}
110// Represent static data for resources
111struct Resource {
112    id: Id,
113    is_dirty: AtomicBool,
114    parent_id: Option<Id>,
115    metadata: &'static Metadata<'static>,
116    concrete_type: String,
117    kind: resource::Kind,
118    location: Option<proto::Location>,
119    is_internal: bool,
120}
121
122/// Represents static data for tasks
123struct Task {
124    id: Id,
125    is_dirty: AtomicBool,
126    metadata: &'static Metadata<'static>,
127    fields: Vec<proto::Field>,
128    location: Option<proto::Location>,
129}
130
131struct AsyncOp {
132    id: Id,
133    is_dirty: AtomicBool,
134    parent_id: Option<Id>,
135    resource_id: Id,
136    metadata: &'static Metadata<'static>,
137    source: String,
138}
139
140impl Aggregator {
141    pub(crate) fn new(
142        events: mpsc::Receiver<Event>,
143        rpcs: mpsc::Receiver<Command>,
144        builder: &crate::Builder,
145        shared: Arc<crate::Shared>,
146        base_time: stats::TimeAnchor,
147    ) -> Self {
148        Self {
149            shared,
150            rpcs,
151            publish_interval: builder.publish_interval,
152            retention: builder.retention,
153            events,
154            watchers: Default::default(),
155            details_watchers: Default::default(),
156            all_metadata: Default::default(),
157            new_metadata: Default::default(),
158            tasks: IdData::default(),
159            task_stats: IdData::default(),
160            resources: IdData::default(),
161            resource_stats: IdData::default(),
162            async_ops: IdData::default(),
163            async_op_stats: IdData::default(),
164            poll_ops: Default::default(),
165            temporality: Temporality::Live,
166            base_time,
167        }
168    }
169
170    /// Runs the aggregator.
171    ///
172    /// This method will start the aggregator loop and should run as long as
173    /// the instrument server is running. If the instrument server stops,
174    /// this future can be aborted.
175    pub async fn run(mut self) {
176        let mut publish = tokio::time::interval(self.publish_interval);
177        loop {
178            let should_send = tokio::select! {
179                // if the flush interval elapses, flush data to the client
180                _ = publish.tick() => {
181                    match self.temporality {
182                        Temporality::Live => true,
183                        Temporality::Paused => false,
184                    }
185                }
186
187                // triggered when the event buffer is approaching capacity
188                _ = self.shared.flush.should_flush.notified() => {
189                    tracing::debug!("approaching capacity; draining buffer");
190                    false
191                }
192
193                // a new command from a client
194                cmd = self.rpcs.recv() => {
195                    match cmd {
196                        Some(Command::Instrument(subscription)) => {
197                            self.add_instrument_subscription(subscription);
198                        },
199                        Some(Command::WatchTaskDetail(watch_request)) => {
200                            self.add_task_detail_subscription(watch_request);
201                        },
202                        Some(Command::Pause) => {
203                            self.temporality = Temporality::Paused;
204                        }
205                        Some(Command::Resume) => {
206                            self.temporality = Temporality::Live;
207                        }
208                        None => {
209                            tracing::debug!("rpc channel closed, terminating");
210                            return;
211                        }
212                    };
213
214                    false
215                }
216
217            };
218
219            // drain and aggregate buffered events.
220            //
221            // Note: we *don't* want to actually await the call to `recv` --- we
222            // don't want the aggregator task to be woken on every event,
223            // because it will then be woken when its own `poll` calls are
224            // exited. that would result in a busy-loop. instead, we only want
225            // to be woken when the flush interval has elapsed, or when the
226            // channel is almost full.
227            let mut drained = false;
228            let mut counts = EventCounts::new();
229            while let Some(event) = recv_now_or_never(&mut self.events) {
230                match event {
231                    Some(event) => {
232                        counts.update(&event);
233                        self.update_state(event);
234                        drained = true;
235                    }
236                    // The channel closed, no more events will be emitted...time
237                    // to stop aggregating.
238                    None => {
239                        tracing::debug!("event channel closed; terminating");
240                        return;
241                    }
242                };
243            }
244            tracing::debug!(
245                async_resource_ops = counts.async_resource_op,
246                metadatas = counts.metadata,
247                poll_ops = counts.poll_op,
248                resources = counts.resource,
249                spawns = counts.spawn,
250                total = counts.total(),
251                "event channel drain loop",
252            );
253
254            // flush data to clients, if there are any currently subscribed
255            // watchers and we should send a new update.
256            if !self.watchers.is_empty() && should_send {
257                self.publish();
258            }
259            self.cleanup_closed();
260            if drained {
261                self.shared.flush.has_flushed();
262            }
263        }
264    }
265
266    fn cleanup_closed(&mut self) {
267        // drop all closed have that has completed *and* whose final data has already
268        // been sent off.
269        let now = Instant::now();
270        let has_watchers = !self.watchers.is_empty();
271        self.tasks
272            .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
273        self.resources
274            .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
275        self.async_ops
276            .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
277        if !has_watchers {
278            self.poll_ops.clear();
279        }
280    }
281
282    /// Add the task subscription to the watchers after sending the first update
283    fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
284        tracing::debug!("new instrument subscription");
285        let now = Instant::now();
286
287        let update = loop {
288            let update = proto::instrument::Update {
289                task_update: Some(self.task_update(Include::All)),
290                resource_update: Some(self.resource_update(Include::All)),
291                async_op_update: Some(self.async_op_update(Include::All)),
292                now: Some(self.base_time.to_timestamp(now)),
293                new_metadata: Some(proto::RegisterMetadata {
294                    metadata: (*self.all_metadata).clone(),
295                }),
296            };
297            let message_size = update.encoded_len();
298            if message_size < MAX_MESSAGE_SIZE {
299                // normal case
300                break Some(update);
301            }
302            // If the grpc message is bigger than tokio-console will accept, throw away the oldest
303            // inactive data and try again
304            self.retention /= 2;
305            self.cleanup_closed();
306            tracing::debug!(
307                retention = ?self.retention,
308                message_size,
309                max_message_size = MAX_MESSAGE_SIZE,
310                "Message too big, reduced retention",
311            );
312
313            if self.retention <= self.publish_interval {
314                self.retention = self.publish_interval;
315                break None;
316            }
317        };
318
319        match update {
320            // Send the initial state
321            Some(update) => {
322                if !subscription.update(&update) {
323                    // If sending the initial update fails, the subscription is already dead,
324                    // so don't add it to `watchers`.
325                    return;
326                }
327            }
328            // User will only get updates.
329            None => tracing::error!(
330                min_retention = ?self.publish_interval,
331                "Message too big. Start with smaller retention.",
332            ),
333        }
334
335        self.watchers.push(subscription);
336    }
337
338    fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
339        proto::tasks::TaskUpdate {
340            new_tasks: self.tasks.as_proto_list(include, &self.base_time),
341            stats_update: self.task_stats.as_proto(include, &self.base_time),
342            dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
343            another_field: 0,
344        }
345    }
346
347    fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
348        proto::resources::ResourceUpdate {
349            new_resources: self.resources.as_proto_list(include, &self.base_time),
350            stats_update: self.resource_stats.as_proto(include, &self.base_time),
351            new_poll_ops: std::mem::take(&mut self.poll_ops),
352            dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
353        }
354    }
355
356    fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
357        proto::async_ops::AsyncOpUpdate {
358            new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
359            stats_update: self.async_op_stats.as_proto(include, &self.base_time),
360            dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
361        }
362    }
363
364    /// Add the task details subscription to the watchers after sending the first update,
365    /// if the task is found.
366    fn add_task_detail_subscription(
367        &mut self,
368        watch_request: WatchRequest<proto::tasks::TaskDetails>,
369    ) {
370        let WatchRequest {
371            id,
372            stream_sender,
373            buffer,
374        } = watch_request;
375        tracing::debug!(id = ?id, "new task details subscription");
376        if let Some(stats) = self.task_stats.get(&id) {
377            let (tx, rx) = mpsc::channel(buffer);
378            let subscription = Watch(tx);
379            let now = Some(self.base_time.to_timestamp(Instant::now()));
380            // Send back the stream receiver.
381            // Then send the initial state --- if this fails, the subscription is already dead.
382            if stream_sender.send(rx).is_ok()
383                && subscription.update(&proto::tasks::TaskDetails {
384                    task_id: Some(id.clone().into()),
385                    now,
386                    poll_times_histogram: Some(stats.poll_duration_histogram()),
387                    scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
388                })
389            {
390                self.details_watchers
391                    .entry(id.clone())
392                    .or_default()
393                    .push(subscription);
394            }
395        }
396        // If the task is not found, drop `stream_sender` which will result in a not found error
397    }
398
399    /// Publish the current state to all active watchers.
400    ///
401    /// This drops any watchers which have closed the RPC, or whose update
402    /// channel has filled up.
403    fn publish(&mut self) {
404        let new_metadata = if !self.new_metadata.is_empty() {
405            Some(proto::RegisterMetadata {
406                metadata: std::mem::take(&mut self.new_metadata),
407            })
408        } else {
409            None
410        };
411        let task_update = Some(self.task_update(Include::UpdatedOnly));
412        let resource_update = Some(self.resource_update(Include::UpdatedOnly));
413        let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
414
415        let update = proto::instrument::Update {
416            now: Some(self.base_time.to_timestamp(Instant::now())),
417            new_metadata,
418            task_update,
419            resource_update,
420            async_op_update,
421        };
422
423        self.watchers
424            .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
425
426        let stats = &self.task_stats;
427        // Assuming there are much fewer task details subscribers than there are
428        // stats updates, iterate over `details_watchers` and compact the map.
429        self.details_watchers.retain_and_shrink(|id, watchers| {
430            if let Some(task_stats) = stats.get(id) {
431                let details = proto::tasks::TaskDetails {
432                    task_id: Some(id.clone().into()),
433                    now: Some(self.base_time.to_timestamp(Instant::now())),
434                    poll_times_histogram: Some(task_stats.poll_duration_histogram()),
435                    scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
436                };
437                watchers.retain(|watch| watch.update(&details));
438                !watchers.is_empty()
439            } else {
440                false
441            }
442        });
443    }
444
445    /// Update the current state with data from a single event.
446    fn update_state(&mut self, event: Event) {
447        // do state update
448        match event {
449            Event::Metadata(meta) => {
450                self.all_metadata.push(meta.into());
451                self.new_metadata.push(meta.into());
452            }
453
454            Event::Spawn {
455                id,
456                metadata,
457                stats,
458                fields,
459                location,
460            } => {
461                self.tasks.insert(
462                    id.clone(),
463                    Task {
464                        id: id.clone(),
465                        is_dirty: AtomicBool::new(true),
466                        metadata,
467                        fields,
468                        location,
469                        // TODO: parents
470                    },
471                );
472
473                self.task_stats.insert(id, stats);
474            }
475
476            Event::Resource {
477                id,
478                parent_id,
479                metadata,
480                kind,
481                concrete_type,
482                location,
483                is_internal,
484                stats,
485            } => {
486                self.resources.insert(
487                    id.clone(),
488                    Resource {
489                        id: id.clone(),
490                        is_dirty: AtomicBool::new(true),
491                        parent_id,
492                        kind,
493                        metadata,
494                        concrete_type,
495                        location,
496                        is_internal,
497                    },
498                );
499
500                self.resource_stats.insert(id, stats);
501            }
502
503            Event::PollOp {
504                metadata,
505                resource_id,
506                op_name,
507                async_op_id,
508                task_id,
509                is_ready,
510            } => {
511                // CLI doesn't show historical poll ops, so don't save them if no-one is watching
512                if self.watchers.is_empty() {
513                    return;
514                }
515                let poll_op = proto::resources::PollOp {
516                    metadata: Some(metadata.into()),
517                    resource_id: Some(resource_id.into()),
518                    name: op_name,
519                    task_id: Some(task_id.into()),
520                    async_op_id: Some(async_op_id.into()),
521                    is_ready,
522                };
523
524                self.poll_ops.push(poll_op);
525            }
526
527            Event::AsyncResourceOp {
528                id,
529                source,
530                resource_id,
531                metadata,
532                parent_id,
533                stats,
534            } => {
535                self.async_ops.insert(
536                    id.clone(),
537                    AsyncOp {
538                        id: id.clone(),
539                        is_dirty: AtomicBool::new(true),
540                        resource_id,
541                        metadata,
542                        source,
543                        parent_id,
544                    },
545                );
546
547                self.async_op_stats.insert(id, stats);
548            }
549        }
550    }
551}
552
553fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
554    let waker = futures_task::noop_waker();
555    let mut cx = std::task::Context::from_waker(&waker);
556
557    match receiver.poll_recv(&mut cx) {
558        std::task::Poll::Ready(opt) => Some(opt),
559        std::task::Poll::Pending => None,
560    }
561}
562
563/// Count of events received in each aggregator drain cycle.
564struct EventCounts {
565    async_resource_op: usize,
566    metadata: usize,
567    poll_op: usize,
568    resource: usize,
569    spawn: usize,
570}
571
572impl EventCounts {
573    fn new() -> Self {
574        Self {
575            async_resource_op: 0,
576            metadata: 0,
577            poll_op: 0,
578            resource: 0,
579            spawn: 0,
580        }
581    }
582
583    /// Count the event based on its variant.
584    fn update(&mut self, event: &Event) {
585        match event {
586            Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
587            Event::Metadata(_) => self.metadata += 1,
588            Event::PollOp { .. } => self.poll_op += 1,
589            Event::Resource { .. } => self.resource += 1,
590            Event::Spawn { .. } => self.spawn += 1,
591        }
592    }
593
594    /// Total number of events recorded.
595    fn total(&self) -> usize {
596        self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
597    }
598}
599
600// ==== impl Flush ===
601
602impl Flush {
603    pub(crate) fn trigger(&self) {
604        if self
605            .triggered
606            .compare_exchange(false, true, AcqRel, Acquire)
607            .is_ok()
608        {
609            self.should_flush.notify_one();
610        } else {
611            // someone else already did it, that's fine...
612        }
613    }
614
615    /// Indicates that the buffer has been successfully flushed.
616    fn has_flushed(&self) {
617        let _ = self
618            .triggered
619            .compare_exchange(true, false, AcqRel, Acquire);
620    }
621}
622
623impl<T: Clone> Watch<T> {
624    fn update(&self, update: &T) -> bool {
625        if let Ok(reserve) = self.0.try_reserve() {
626            reserve.send(Ok(update.clone()));
627            true
628        } else {
629            false
630        }
631    }
632}
633
634impl ToProto for Task {
635    type Output = proto::tasks::Task;
636
637    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
638        proto::tasks::Task {
639            id: Some(self.id.clone().into()),
640            // TODO: more kinds of tasks...
641            kind: proto::tasks::task::Kind::Spawn as i32,
642            metadata: Some(self.metadata.into()),
643            parents: Vec::new(), // TODO: implement parents nicely
644            fields: self.fields.clone(),
645            location: self.location.clone(),
646        }
647    }
648}
649
650impl Unsent for Task {
651    fn take_unsent(&self) -> bool {
652        self.is_dirty.swap(false, AcqRel)
653    }
654
655    fn is_unsent(&self) -> bool {
656        self.is_dirty.load(Acquire)
657    }
658}
659
660impl ToProto for Resource {
661    type Output = proto::resources::Resource;
662
663    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
664        proto::resources::Resource {
665            id: Some(self.id.clone().into()),
666            parent_resource_id: self.parent_id.clone().map(Into::into),
667            kind: Some(self.kind.clone()),
668            metadata: Some(self.metadata.into()),
669            concrete_type: self.concrete_type.clone(),
670            location: self.location.clone(),
671            is_internal: self.is_internal,
672        }
673    }
674}
675
676impl Unsent for Resource {
677    fn take_unsent(&self) -> bool {
678        self.is_dirty.swap(false, AcqRel)
679    }
680
681    fn is_unsent(&self) -> bool {
682        self.is_dirty.load(Acquire)
683    }
684}
685
686impl ToProto for AsyncOp {
687    type Output = proto::async_ops::AsyncOp;
688
689    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
690        proto::async_ops::AsyncOp {
691            id: Some(self.id.clone().into()),
692            metadata: Some(self.metadata.into()),
693            resource_id: Some(self.resource_id.clone().into()),
694            source: self.source.clone(),
695            parent_async_op_id: self.parent_id.clone().map(Into::into),
696        }
697    }
698}
699
700impl Unsent for AsyncOp {
701    fn take_unsent(&self) -> bool {
702        self.is_dirty.swap(false, AcqRel)
703    }
704
705    fn is_unsent(&self) -> bool {
706        self.is_dirty.load(Acquire)
707    }
708}