1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering::*},
4 Arc,
5 },
6 time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17 stats::{self, Unsent},
18 ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29pub struct Aggregator {
36 events: mpsc::Receiver<Event>,
38
39 rpcs: mpsc::Receiver<Command>,
41
42 publish_interval: Duration,
44
45 retention: Duration,
47
48 shared: Arc<Shared>,
51
52 watchers: ShrinkVec<Watch<proto::instrument::Update>>,
54
55 details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
57
58 all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
62
63 new_metadata: Vec<proto::register_metadata::NewMetadata>,
67
68 tasks: IdData<Task>,
70
71 task_stats: IdData<Arc<stats::TaskStats>>,
73
74 resources: IdData<Resource>,
76
77 resource_stats: IdData<Arc<stats::ResourceStats>>,
79
80 async_ops: IdData<AsyncOp>,
82
83 async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
85
86 poll_ops: Vec<proto::resources::PollOp>,
90
91 temporality: Temporality,
93
94 base_time: stats::TimeAnchor,
97}
98
99#[derive(Debug, Default)]
100pub(crate) struct Flush {
101 pub(crate) should_flush: Notify,
102 triggered: AtomicBool,
103}
104
105#[derive(Debug)]
106enum Temporality {
107 Live,
108 Paused,
109}
110struct Resource {
112 id: Id,
113 is_dirty: AtomicBool,
114 parent_id: Option<Id>,
115 metadata: &'static Metadata<'static>,
116 concrete_type: String,
117 kind: resource::Kind,
118 location: Option<proto::Location>,
119 is_internal: bool,
120}
121
122struct Task {
124 id: Id,
125 is_dirty: AtomicBool,
126 metadata: &'static Metadata<'static>,
127 fields: Vec<proto::Field>,
128 location: Option<proto::Location>,
129}
130
131struct AsyncOp {
132 id: Id,
133 is_dirty: AtomicBool,
134 parent_id: Option<Id>,
135 resource_id: Id,
136 metadata: &'static Metadata<'static>,
137 source: String,
138}
139
140impl Aggregator {
141 pub(crate) fn new(
142 events: mpsc::Receiver<Event>,
143 rpcs: mpsc::Receiver<Command>,
144 builder: &crate::Builder,
145 shared: Arc<crate::Shared>,
146 base_time: stats::TimeAnchor,
147 ) -> Self {
148 Self {
149 shared,
150 rpcs,
151 publish_interval: builder.publish_interval,
152 retention: builder.retention,
153 events,
154 watchers: Default::default(),
155 details_watchers: Default::default(),
156 all_metadata: Default::default(),
157 new_metadata: Default::default(),
158 tasks: IdData::default(),
159 task_stats: IdData::default(),
160 resources: IdData::default(),
161 resource_stats: IdData::default(),
162 async_ops: IdData::default(),
163 async_op_stats: IdData::default(),
164 poll_ops: Default::default(),
165 temporality: Temporality::Live,
166 base_time,
167 }
168 }
169
170 pub async fn run(mut self) {
176 let mut publish = tokio::time::interval(self.publish_interval);
177 loop {
178 let should_send = tokio::select! {
179 _ = publish.tick() => {
181 match self.temporality {
182 Temporality::Live => true,
183 Temporality::Paused => false,
184 }
185 }
186
187 _ = self.shared.flush.should_flush.notified() => {
189 tracing::debug!("approaching capacity; draining buffer");
190 false
191 }
192
193 cmd = self.rpcs.recv() => {
195 match cmd {
196 Some(Command::Instrument(subscription)) => {
197 self.add_instrument_subscription(subscription);
198 },
199 Some(Command::WatchTaskDetail(watch_request)) => {
200 self.add_task_detail_subscription(watch_request);
201 },
202 Some(Command::Pause) => {
203 self.temporality = Temporality::Paused;
204 }
205 Some(Command::Resume) => {
206 self.temporality = Temporality::Live;
207 }
208 None => {
209 tracing::debug!("rpc channel closed, terminating");
210 return;
211 }
212 };
213
214 false
215 }
216
217 };
218
219 let mut drained = false;
228 let mut counts = EventCounts::new();
229 while let Some(event) = recv_now_or_never(&mut self.events) {
230 match event {
231 Some(event) => {
232 counts.update(&event);
233 self.update_state(event);
234 drained = true;
235 }
236 None => {
239 tracing::debug!("event channel closed; terminating");
240 return;
241 }
242 };
243 }
244 tracing::debug!(
245 async_resource_ops = counts.async_resource_op,
246 metadatas = counts.metadata,
247 poll_ops = counts.poll_op,
248 resources = counts.resource,
249 spawns = counts.spawn,
250 total = counts.total(),
251 "event channel drain loop",
252 );
253
254 if !self.watchers.is_empty() && should_send {
257 self.publish();
258 }
259 self.cleanup_closed();
260 if drained {
261 self.shared.flush.has_flushed();
262 }
263 }
264 }
265
266 fn cleanup_closed(&mut self) {
267 let now = Instant::now();
270 let has_watchers = !self.watchers.is_empty();
271 self.tasks
272 .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
273 self.resources
274 .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
275 self.async_ops
276 .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
277 if !has_watchers {
278 self.poll_ops.clear();
279 }
280 }
281
282 fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
284 tracing::debug!("new instrument subscription");
285 let now = Instant::now();
286
287 let update = loop {
288 let update = proto::instrument::Update {
289 task_update: Some(self.task_update(Include::All)),
290 resource_update: Some(self.resource_update(Include::All)),
291 async_op_update: Some(self.async_op_update(Include::All)),
292 now: Some(self.base_time.to_timestamp(now)),
293 new_metadata: Some(proto::RegisterMetadata {
294 metadata: (*self.all_metadata).clone(),
295 }),
296 };
297 let message_size = update.encoded_len();
298 if message_size < MAX_MESSAGE_SIZE {
299 break Some(update);
301 }
302 self.retention /= 2;
305 self.cleanup_closed();
306 tracing::debug!(
307 retention = ?self.retention,
308 message_size,
309 max_message_size = MAX_MESSAGE_SIZE,
310 "Message too big, reduced retention",
311 );
312
313 if self.retention <= self.publish_interval {
314 self.retention = self.publish_interval;
315 break None;
316 }
317 };
318
319 match update {
320 Some(update) => {
322 if !subscription.update(&update) {
323 return;
326 }
327 }
328 None => tracing::error!(
330 min_retention = ?self.publish_interval,
331 "Message too big. Start with smaller retention.",
332 ),
333 }
334
335 self.watchers.push(subscription);
336 }
337
338 fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
339 proto::tasks::TaskUpdate {
340 new_tasks: self.tasks.as_proto_list(include, &self.base_time),
341 stats_update: self.task_stats.as_proto(include, &self.base_time),
342 dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
343 another_field: 0,
344 }
345 }
346
347 fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
348 proto::resources::ResourceUpdate {
349 new_resources: self.resources.as_proto_list(include, &self.base_time),
350 stats_update: self.resource_stats.as_proto(include, &self.base_time),
351 new_poll_ops: std::mem::take(&mut self.poll_ops),
352 dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
353 }
354 }
355
356 fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
357 proto::async_ops::AsyncOpUpdate {
358 new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
359 stats_update: self.async_op_stats.as_proto(include, &self.base_time),
360 dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
361 }
362 }
363
364 fn add_task_detail_subscription(
367 &mut self,
368 watch_request: WatchRequest<proto::tasks::TaskDetails>,
369 ) {
370 let WatchRequest {
371 id,
372 stream_sender,
373 buffer,
374 } = watch_request;
375 tracing::debug!(id = ?id, "new task details subscription");
376 if let Some(stats) = self.task_stats.get(&id) {
377 let (tx, rx) = mpsc::channel(buffer);
378 let subscription = Watch(tx);
379 let now = Some(self.base_time.to_timestamp(Instant::now()));
380 if stream_sender.send(rx).is_ok()
383 && subscription.update(&proto::tasks::TaskDetails {
384 task_id: Some(id.clone().into()),
385 now,
386 poll_times_histogram: Some(stats.poll_duration_histogram()),
387 scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
388 })
389 {
390 self.details_watchers
391 .entry(id.clone())
392 .or_default()
393 .push(subscription);
394 }
395 }
396 }
398
399 fn publish(&mut self) {
404 let new_metadata = if !self.new_metadata.is_empty() {
405 Some(proto::RegisterMetadata {
406 metadata: std::mem::take(&mut self.new_metadata),
407 })
408 } else {
409 None
410 };
411 let task_update = Some(self.task_update(Include::UpdatedOnly));
412 let resource_update = Some(self.resource_update(Include::UpdatedOnly));
413 let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
414
415 let update = proto::instrument::Update {
416 now: Some(self.base_time.to_timestamp(Instant::now())),
417 new_metadata,
418 task_update,
419 resource_update,
420 async_op_update,
421 };
422
423 self.watchers
424 .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
425
426 let stats = &self.task_stats;
427 self.details_watchers.retain_and_shrink(|id, watchers| {
430 if let Some(task_stats) = stats.get(id) {
431 let details = proto::tasks::TaskDetails {
432 task_id: Some(id.clone().into()),
433 now: Some(self.base_time.to_timestamp(Instant::now())),
434 poll_times_histogram: Some(task_stats.poll_duration_histogram()),
435 scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
436 };
437 watchers.retain(|watch| watch.update(&details));
438 !watchers.is_empty()
439 } else {
440 false
441 }
442 });
443 }
444
445 fn update_state(&mut self, event: Event) {
447 match event {
449 Event::Metadata(meta) => {
450 self.all_metadata.push(meta.into());
451 self.new_metadata.push(meta.into());
452 }
453
454 Event::Spawn {
455 id,
456 metadata,
457 stats,
458 fields,
459 location,
460 } => {
461 self.tasks.insert(
462 id.clone(),
463 Task {
464 id: id.clone(),
465 is_dirty: AtomicBool::new(true),
466 metadata,
467 fields,
468 location,
469 },
471 );
472
473 self.task_stats.insert(id, stats);
474 }
475
476 Event::Resource {
477 id,
478 parent_id,
479 metadata,
480 kind,
481 concrete_type,
482 location,
483 is_internal,
484 stats,
485 } => {
486 self.resources.insert(
487 id.clone(),
488 Resource {
489 id: id.clone(),
490 is_dirty: AtomicBool::new(true),
491 parent_id,
492 kind,
493 metadata,
494 concrete_type,
495 location,
496 is_internal,
497 },
498 );
499
500 self.resource_stats.insert(id, stats);
501 }
502
503 Event::PollOp {
504 metadata,
505 resource_id,
506 op_name,
507 async_op_id,
508 task_id,
509 is_ready,
510 } => {
511 if self.watchers.is_empty() {
513 return;
514 }
515 let poll_op = proto::resources::PollOp {
516 metadata: Some(metadata.into()),
517 resource_id: Some(resource_id.into()),
518 name: op_name,
519 task_id: Some(task_id.into()),
520 async_op_id: Some(async_op_id.into()),
521 is_ready,
522 };
523
524 self.poll_ops.push(poll_op);
525 }
526
527 Event::AsyncResourceOp {
528 id,
529 source,
530 resource_id,
531 metadata,
532 parent_id,
533 stats,
534 } => {
535 self.async_ops.insert(
536 id.clone(),
537 AsyncOp {
538 id: id.clone(),
539 is_dirty: AtomicBool::new(true),
540 resource_id,
541 metadata,
542 source,
543 parent_id,
544 },
545 );
546
547 self.async_op_stats.insert(id, stats);
548 }
549 }
550 }
551}
552
553fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
554 let waker = futures_task::noop_waker();
555 let mut cx = std::task::Context::from_waker(&waker);
556
557 match receiver.poll_recv(&mut cx) {
558 std::task::Poll::Ready(opt) => Some(opt),
559 std::task::Poll::Pending => None,
560 }
561}
562
563struct EventCounts {
565 async_resource_op: usize,
566 metadata: usize,
567 poll_op: usize,
568 resource: usize,
569 spawn: usize,
570}
571
572impl EventCounts {
573 fn new() -> Self {
574 Self {
575 async_resource_op: 0,
576 metadata: 0,
577 poll_op: 0,
578 resource: 0,
579 spawn: 0,
580 }
581 }
582
583 fn update(&mut self, event: &Event) {
585 match event {
586 Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
587 Event::Metadata(_) => self.metadata += 1,
588 Event::PollOp { .. } => self.poll_op += 1,
589 Event::Resource { .. } => self.resource += 1,
590 Event::Spawn { .. } => self.spawn += 1,
591 }
592 }
593
594 fn total(&self) -> usize {
596 self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
597 }
598}
599
600impl Flush {
603 pub(crate) fn trigger(&self) {
604 if self
605 .triggered
606 .compare_exchange(false, true, AcqRel, Acquire)
607 .is_ok()
608 {
609 self.should_flush.notify_one();
610 } else {
611 }
613 }
614
615 fn has_flushed(&self) {
617 let _ = self
618 .triggered
619 .compare_exchange(true, false, AcqRel, Acquire);
620 }
621}
622
623impl<T: Clone> Watch<T> {
624 fn update(&self, update: &T) -> bool {
625 if let Ok(reserve) = self.0.try_reserve() {
626 reserve.send(Ok(update.clone()));
627 true
628 } else {
629 false
630 }
631 }
632}
633
634impl ToProto for Task {
635 type Output = proto::tasks::Task;
636
637 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
638 proto::tasks::Task {
639 id: Some(self.id.clone().into()),
640 kind: proto::tasks::task::Kind::Spawn as i32,
642 metadata: Some(self.metadata.into()),
643 parents: Vec::new(), fields: self.fields.clone(),
645 location: self.location.clone(),
646 }
647 }
648}
649
650impl Unsent for Task {
651 fn take_unsent(&self) -> bool {
652 self.is_dirty.swap(false, AcqRel)
653 }
654
655 fn is_unsent(&self) -> bool {
656 self.is_dirty.load(Acquire)
657 }
658}
659
660impl ToProto for Resource {
661 type Output = proto::resources::Resource;
662
663 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
664 proto::resources::Resource {
665 id: Some(self.id.clone().into()),
666 parent_resource_id: self.parent_id.clone().map(Into::into),
667 kind: Some(self.kind.clone()),
668 metadata: Some(self.metadata.into()),
669 concrete_type: self.concrete_type.clone(),
670 location: self.location.clone(),
671 is_internal: self.is_internal,
672 }
673 }
674}
675
676impl Unsent for Resource {
677 fn take_unsent(&self) -> bool {
678 self.is_dirty.swap(false, AcqRel)
679 }
680
681 fn is_unsent(&self) -> bool {
682 self.is_dirty.load(Acquire)
683 }
684}
685
686impl ToProto for AsyncOp {
687 type Output = proto::async_ops::AsyncOp;
688
689 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
690 proto::async_ops::AsyncOp {
691 id: Some(self.id.clone().into()),
692 metadata: Some(self.metadata.into()),
693 resource_id: Some(self.resource_id.clone().into()),
694 source: self.source.clone(),
695 parent_async_op_id: self.parent_id.clone().map(Into::into),
696 }
697 }
698}
699
700impl Unsent for AsyncOp {
701 fn take_unsent(&self) -> bool {
702 self.is_dirty.swap(false, AcqRel)
703 }
704
705 fn is_unsent(&self) -> bool {
706 self.is_dirty.load(Acquire)
707 }
708}