1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering::*},
4 Arc,
5 },
6 time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17 stats::{self, Unsent},
18 ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29pub struct Aggregator {
36 events: mpsc::Receiver<Event>,
38
39 rpcs: mpsc::Receiver<Command>,
41
42 publish_interval: Duration,
44
45 retention: Duration,
47
48 shared: Arc<Shared>,
51
52 state_watchers: ShrinkVec<Watch<proto::instrument::State>>,
54
55 watchers: ShrinkVec<Watch<proto::instrument::Update>>,
57
58 details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
60
61 all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
65
66 new_metadata: Vec<proto::register_metadata::NewMetadata>,
70
71 tasks: IdData<Task>,
73
74 task_stats: IdData<Arc<stats::TaskStats>>,
76
77 resources: IdData<Resource>,
79
80 resource_stats: IdData<Arc<stats::ResourceStats>>,
82
83 async_ops: IdData<AsyncOp>,
85
86 async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
88
89 poll_ops: Vec<proto::resources::PollOp>,
93
94 temporality: proto::instrument::Temporality,
96
97 base_time: stats::TimeAnchor,
100}
101
102#[derive(Debug, Default)]
103pub(crate) struct Flush {
104 pub(crate) should_flush: Notify,
105 triggered: AtomicBool,
106}
107
108struct Resource {
110 id: Id,
111 is_dirty: AtomicBool,
112 parent_id: Option<Id>,
113 metadata: &'static Metadata<'static>,
114 concrete_type: String,
115 kind: resource::Kind,
116 location: Option<proto::Location>,
117 is_internal: bool,
118}
119
120struct Task {
122 id: Id,
123 is_dirty: AtomicBool,
124 metadata: &'static Metadata<'static>,
125 fields: Vec<proto::Field>,
126 location: Option<proto::Location>,
127}
128
129struct AsyncOp {
130 id: Id,
131 is_dirty: AtomicBool,
132 parent_id: Option<Id>,
133 resource_id: Id,
134 metadata: &'static Metadata<'static>,
135 source: String,
136}
137
138impl Aggregator {
139 pub(crate) fn new(
140 events: mpsc::Receiver<Event>,
141 rpcs: mpsc::Receiver<Command>,
142 builder: &crate::Builder,
143 shared: Arc<crate::Shared>,
144 base_time: stats::TimeAnchor,
145 ) -> Self {
146 Self {
147 shared,
148 rpcs,
149 publish_interval: builder.publish_interval,
150 retention: builder.retention,
151 events,
152 watchers: Default::default(),
153 details_watchers: Default::default(),
154 state_watchers: Default::default(),
155 all_metadata: Default::default(),
156 new_metadata: Default::default(),
157 tasks: IdData::default(),
158 task_stats: IdData::default(),
159 resources: IdData::default(),
160 resource_stats: IdData::default(),
161 async_ops: IdData::default(),
162 async_op_stats: IdData::default(),
163 poll_ops: Default::default(),
164 temporality: proto::instrument::Temporality::Live,
165 base_time,
166 }
167 }
168
169 pub async fn run(mut self) {
175 let mut publish = tokio::time::interval(self.publish_interval);
176 loop {
177 let should_send = tokio::select! {
178 _ = publish.tick() => {
180 match self.temporality {
181 proto::instrument::Temporality::Live => true,
182 proto::instrument::Temporality::Paused => false,
183 }
184 }
185
186 _ = self.shared.flush.should_flush.notified() => {
188 tracing::debug!("approaching capacity; draining buffer");
189 false
190 }
191
192 cmd = self.rpcs.recv() => {
194 match cmd {
195 Some(Command::Instrument(subscription)) => {
196 self.add_instrument_subscription(subscription);
197 },
198 Some(Command::WatchTaskDetail(watch_request)) => {
199 self.add_task_detail_subscription(watch_request);
200 },
201 Some(Command::WatchState(subscription)) => {
202 self.add_state_subscription(subscription);
203 }
204 Some(Command::Pause) => {
205 self.temporality = proto::instrument::Temporality::Paused;
206 }
207 Some(Command::Resume) => {
208 self.temporality = proto::instrument::Temporality::Live;
209 }
210 None => {
211 tracing::debug!("rpc channel closed, terminating");
212 return;
213 }
214 };
215
216 false
217 }
218 };
219
220 let mut drained = false;
229 let mut counts = EventCounts::new();
230 while let Some(event) = recv_now_or_never(&mut self.events) {
231 match event {
232 Some(event) => {
233 counts.update(&event);
234 self.update_state(event);
235 drained = true;
236 }
237 None => {
240 tracing::debug!("event channel closed; terminating");
241 return;
242 }
243 };
244 }
245 tracing::debug!(
246 async_resource_ops = counts.async_resource_op,
247 metadatas = counts.metadata,
248 poll_ops = counts.poll_op,
249 resources = counts.resource,
250 spawns = counts.spawn,
251 total = counts.total(),
252 "event channel drain loop",
253 );
254
255 if !self.state_watchers.is_empty() {
256 self.publish_state();
257 }
258
259 if !self.watchers.is_empty() && should_send {
262 self.publish();
263 }
264 self.cleanup_closed();
265 if drained {
266 self.shared.flush.has_flushed();
267 }
268 }
269 }
270
271 fn cleanup_closed(&mut self) {
272 let now = Instant::now();
275 let has_watchers = !self.watchers.is_empty();
276 self.tasks
277 .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
278 self.resources
279 .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
280 self.async_ops
281 .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
282 if !has_watchers {
283 self.poll_ops.clear();
284 }
285 }
286
287 fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
289 tracing::debug!("new instrument subscription");
290 let now = Instant::now();
291
292 let update = loop {
293 let update = proto::instrument::Update {
294 task_update: Some(self.task_update(Include::All)),
295 resource_update: Some(self.resource_update(Include::All)),
296 async_op_update: Some(self.async_op_update(Include::All)),
297 now: Some(self.base_time.to_timestamp(now)),
298 new_metadata: Some(proto::RegisterMetadata {
299 metadata: (*self.all_metadata).clone(),
300 }),
301 };
302 let message_size = update.encoded_len();
303 if message_size < MAX_MESSAGE_SIZE {
304 break Some(update);
306 }
307 self.retention /= 2;
310 self.cleanup_closed();
311 tracing::debug!(
312 retention = ?self.retention,
313 message_size,
314 max_message_size = MAX_MESSAGE_SIZE,
315 "Message too big, reduced retention",
316 );
317
318 if self.retention <= self.publish_interval {
319 self.retention = self.publish_interval;
320 break None;
321 }
322 };
323
324 match update {
325 Some(update) => {
327 if !subscription.update(&update) {
328 return;
331 }
332 }
333 None => tracing::error!(
335 min_retention = ?self.publish_interval,
336 "Message too big. Start with smaller retention.",
337 ),
338 }
339
340 self.watchers.push(subscription);
341 }
342
343 fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
344 proto::tasks::TaskUpdate {
345 new_tasks: self.tasks.as_proto_list(include, &self.base_time),
346 stats_update: self.task_stats.as_proto(include, &self.base_time),
347 dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
348 }
349 }
350
351 fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
352 proto::resources::ResourceUpdate {
353 new_resources: self.resources.as_proto_list(include, &self.base_time),
354 stats_update: self.resource_stats.as_proto(include, &self.base_time),
355 new_poll_ops: std::mem::take(&mut self.poll_ops),
356 dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
357 }
358 }
359
360 fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
361 proto::async_ops::AsyncOpUpdate {
362 new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
363 stats_update: self.async_op_stats.as_proto(include, &self.base_time),
364 dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
365 }
366 }
367
368 fn add_task_detail_subscription(
371 &mut self,
372 watch_request: WatchRequest<proto::tasks::TaskDetails>,
373 ) {
374 let WatchRequest {
375 id,
376 stream_sender,
377 buffer,
378 } = watch_request;
379 tracing::debug!(id = ?id, "new task details subscription");
380 if let Some(stats) = self.task_stats.get(&id) {
381 let (tx, rx) = mpsc::channel(buffer);
382 let subscription = Watch(tx);
383 let now = Some(self.base_time.to_timestamp(Instant::now()));
384 if stream_sender.send(rx).is_ok()
387 && subscription.update(&proto::tasks::TaskDetails {
388 task_id: Some(id.clone().into()),
389 now,
390 poll_times_histogram: Some(stats.poll_duration_histogram()),
391 scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
392 })
393 {
394 self.details_watchers
395 .entry(id.clone())
396 .or_default()
397 .push(subscription);
398 }
399 }
400 }
402
403 fn add_state_subscription(&mut self, subscription: Watch<proto::instrument::State>) {
405 self.state_watchers.push(subscription);
406 }
407
408 fn publish_state(&mut self) {
410 let state = proto::instrument::State {
411 temporality: self.temporality.into(),
412 };
413 self.state_watchers
414 .retain_and_shrink(|watch| watch.update(&state));
415 }
416
417 fn publish(&mut self) {
422 let new_metadata = if !self.new_metadata.is_empty() {
423 Some(proto::RegisterMetadata {
424 metadata: std::mem::take(&mut self.new_metadata),
425 })
426 } else {
427 None
428 };
429 let task_update = Some(self.task_update(Include::UpdatedOnly));
430 let resource_update = Some(self.resource_update(Include::UpdatedOnly));
431 let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
432
433 let update = proto::instrument::Update {
434 now: Some(self.base_time.to_timestamp(Instant::now())),
435 new_metadata,
436 task_update,
437 resource_update,
438 async_op_update,
439 };
440
441 self.watchers
442 .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
443
444 let stats = &self.task_stats;
445 self.details_watchers.retain_and_shrink(|id, watchers| {
448 if let Some(task_stats) = stats.get(id) {
449 let details = proto::tasks::TaskDetails {
450 task_id: Some(id.clone().into()),
451 now: Some(self.base_time.to_timestamp(Instant::now())),
452 poll_times_histogram: Some(task_stats.poll_duration_histogram()),
453 scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
454 };
455 watchers.retain(|watch| watch.update(&details));
456 !watchers.is_empty()
457 } else {
458 false
459 }
460 });
461 }
462
463 fn update_state(&mut self, event: Event) {
465 match event {
467 Event::Metadata(meta) => {
468 self.all_metadata.push(meta.into());
469 self.new_metadata.push(meta.into());
470 }
471
472 Event::Spawn {
473 id,
474 metadata,
475 stats,
476 fields,
477 location,
478 } => {
479 self.tasks.insert(
480 id.clone(),
481 Task {
482 id: id.clone(),
483 is_dirty: AtomicBool::new(true),
484 metadata,
485 fields,
486 location,
487 },
489 );
490
491 self.task_stats.insert(id, stats);
492 }
493
494 Event::Resource {
495 id,
496 parent_id,
497 metadata,
498 kind,
499 concrete_type,
500 location,
501 is_internal,
502 stats,
503 } => {
504 self.resources.insert(
505 id.clone(),
506 Resource {
507 id: id.clone(),
508 is_dirty: AtomicBool::new(true),
509 parent_id,
510 kind,
511 metadata,
512 concrete_type,
513 location,
514 is_internal,
515 },
516 );
517
518 self.resource_stats.insert(id, stats);
519 }
520
521 Event::PollOp {
522 metadata,
523 resource_id,
524 op_name,
525 async_op_id,
526 task_id,
527 is_ready,
528 } => {
529 if self.watchers.is_empty() {
531 return;
532 }
533 let poll_op = proto::resources::PollOp {
534 metadata: Some(metadata.into()),
535 resource_id: Some(resource_id.into()),
536 name: op_name,
537 task_id: Some(task_id.into()),
538 async_op_id: Some(async_op_id.into()),
539 is_ready,
540 };
541
542 self.poll_ops.push(poll_op);
543 }
544
545 Event::AsyncResourceOp {
546 id,
547 source,
548 resource_id,
549 metadata,
550 parent_id,
551 stats,
552 } => {
553 self.async_ops.insert(
554 id.clone(),
555 AsyncOp {
556 id: id.clone(),
557 is_dirty: AtomicBool::new(true),
558 resource_id,
559 metadata,
560 source,
561 parent_id,
562 },
563 );
564
565 self.async_op_stats.insert(id, stats);
566 }
567 }
568 }
569}
570
571fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
572 let waker = futures_task::noop_waker();
573 let mut cx = std::task::Context::from_waker(&waker);
574
575 match receiver.poll_recv(&mut cx) {
576 std::task::Poll::Ready(opt) => Some(opt),
577 std::task::Poll::Pending => None,
578 }
579}
580
581struct EventCounts {
583 async_resource_op: usize,
584 metadata: usize,
585 poll_op: usize,
586 resource: usize,
587 spawn: usize,
588}
589
590impl EventCounts {
591 fn new() -> Self {
592 Self {
593 async_resource_op: 0,
594 metadata: 0,
595 poll_op: 0,
596 resource: 0,
597 spawn: 0,
598 }
599 }
600
601 fn update(&mut self, event: &Event) {
603 match event {
604 Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
605 Event::Metadata(_) => self.metadata += 1,
606 Event::PollOp { .. } => self.poll_op += 1,
607 Event::Resource { .. } => self.resource += 1,
608 Event::Spawn { .. } => self.spawn += 1,
609 }
610 }
611
612 fn total(&self) -> usize {
614 self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
615 }
616}
617
618impl Flush {
621 pub(crate) fn trigger(&self) {
622 if self
623 .triggered
624 .compare_exchange(false, true, AcqRel, Acquire)
625 .is_ok()
626 {
627 self.should_flush.notify_one();
628 } else {
629 }
631 }
632
633 fn has_flushed(&self) {
635 let _ = self
636 .triggered
637 .compare_exchange(true, false, AcqRel, Acquire);
638 }
639}
640
641impl<T: Clone> Watch<T> {
642 fn update(&self, update: &T) -> bool {
643 if let Ok(reserve) = self.0.try_reserve() {
644 reserve.send(Ok(update.clone()));
645 true
646 } else {
647 false
648 }
649 }
650}
651
652impl ToProto for Task {
653 type Output = proto::tasks::Task;
654
655 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
656 proto::tasks::Task {
657 id: Some(self.id.clone().into()),
658 kind: proto::tasks::task::Kind::Spawn as i32,
660 metadata: Some(self.metadata.into()),
661 parents: Vec::new(), fields: self.fields.clone(),
663 location: self.location.clone(),
664 }
665 }
666}
667
668impl Unsent for Task {
669 fn take_unsent(&self) -> bool {
670 self.is_dirty.swap(false, AcqRel)
671 }
672
673 fn is_unsent(&self) -> bool {
674 self.is_dirty.load(Acquire)
675 }
676}
677
678impl ToProto for Resource {
679 type Output = proto::resources::Resource;
680
681 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
682 proto::resources::Resource {
683 id: Some(self.id.clone().into()),
684 parent_resource_id: self.parent_id.clone().map(Into::into),
685 kind: Some(self.kind.clone()),
686 metadata: Some(self.metadata.into()),
687 concrete_type: self.concrete_type.clone(),
688 location: self.location.clone(),
689 is_internal: self.is_internal,
690 }
691 }
692}
693
694impl Unsent for Resource {
695 fn take_unsent(&self) -> bool {
696 self.is_dirty.swap(false, AcqRel)
697 }
698
699 fn is_unsent(&self) -> bool {
700 self.is_dirty.load(Acquire)
701 }
702}
703
704impl ToProto for AsyncOp {
705 type Output = proto::async_ops::AsyncOp;
706
707 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
708 proto::async_ops::AsyncOp {
709 id: Some(self.id.clone().into()),
710 metadata: Some(self.metadata.into()),
711 resource_id: Some(self.resource_id.clone().into()),
712 source: self.source.clone(),
713 parent_async_op_id: self.parent_id.clone().map(Into::into),
714 }
715 }
716}
717
718impl Unsent for AsyncOp {
719 fn take_unsent(&self) -> bool {
720 self.is_dirty.swap(false, AcqRel)
721 }
722
723 fn is_unsent(&self) -> bool {
724 self.is_dirty.load(Acquire)
725 }
726}