1use std::{
5 sync::{Arc, Mutex},
6 thread::JoinHandle,
7 time::Duration,
8};
9
10use crate::{
11 IndexMap, IndexSet, Instant,
12 allocator::{self, AllocationGroup, GroupAllocationStatistics, QueueAllocTracker, no_track},
13 config::ProfileConfig,
14 events::{SpanClosedEvent, SpanMemoryUpdateEvent},
15 reporter::{ProfileReporter, SpanAllocations, SpanCpuTime, SpanMetadata, SpanWallTime},
16 sampler::{AlwaysSample, HeadSampler},
17};
18use crate::{
19 events::{EventQueue, SpanEvent, SpanEventKind},
20 reporter::SpanHeap,
21};
22use tracing::{Subscriber, field::Visit, span};
23use tracing_subscriber::{
24 Layer,
25 registry::{LookupSpan, SpanRef},
26};
27
28pub struct ProfileLayer<R, H = AlwaysSample>
30where
31 R: ProfileReporter + Send + Sync + 'static,
32{
33 config: ProfileConfig,
34 span_events: Arc<EventQueue<SpanEvent>>,
35 span_closed_events: Arc<EventQueue<SpanClosedEvent>>,
38 memory_events: Arc<EventQueue<SpanMemoryUpdateEvent>>,
39 close_events: Arc<EventQueue<()>>,
40 join_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
41 reporter: Option<R>,
42 sampler: H,
43}
44
45impl<R> ProfileLayer<R>
46where
47 R: ProfileReporter + Send + Sync + 'static,
48{
49 pub fn new(config: ProfileConfig, reporter: R) -> Self {
51 Self::new_with_sampler(config, reporter, AlwaysSample)
52 }
53}
54
55impl<R, H> ProfileLayer<R, H>
56where
57 R: ProfileReporter + Send + Sync + 'static,
58 H: HeadSampler + 'static,
59{
60 pub fn new_with_sampler(config: ProfileConfig, reporter: R, sampler: H) -> Self {
62 ProfileLayer {
63 span_events: Arc::new(EventQueue::new()),
64 memory_events: Arc::new(EventQueue::new()),
65 span_closed_events: Arc::new(EventQueue::new()),
66 close_events: Arc::new(EventQueue::new()),
67 join_handle: Arc::new(Mutex::new(None)),
68 reporter: Some(reporter),
69 sampler,
70 config,
71 }
72 }
73
74 pub fn config(&self) -> &ProfileConfig {
76 &self.config
77 }
78
79 pub fn shutdown_handle(&self) -> ShutdownHandle {
81 ShutdownHandle {
82 close_events: self.close_events.clone(),
83 join_handle: self.join_handle.clone(),
84 }
85 }
86
87 #[inline]
88 #[must_use]
89 const fn is_tracking(&self) -> bool {
90 self.config.track_allocations || self.config.track_cpu_time || self.config.track_wall_time
91 }
92
93 #[allow(clippy::unused_self)]
94 fn is_excluded<S>(&self, span: &SpanRef<'_, S>) -> bool
95 where
96 S: Subscriber + for<'a> LookupSpan<'a>,
97 {
98 if !H::ENABLED {
99 return false;
100 }
101
102 let ext = span.extensions();
103 ext.get::<SpanExcluded>().is_some()
104 }
105}
106
107impl<S, R, H> Layer<S> for ProfileLayer<R, H>
108where
109 S: Subscriber + for<'a> LookupSpan<'a>,
110 R: ProfileReporter + Send + Sync + 'static,
111 H: HeadSampler + 'static,
112{
113 fn on_layer(&mut self, _subscriber: &mut S) {
114 if !self.is_tracking() {
115 return;
116 }
117
118 allocator::set_global_tracker(QueueAllocTracker {
119 events: self.memory_events.clone(),
120 });
121
122 let mut reporter = self.reporter.take().unwrap();
123 reporter.init(&self.config);
124
125 let handle = spawn_reporter_thread(
126 reporter,
127 self.memory_events.clone(),
128 self.span_events.clone(),
129 self.span_closed_events.clone(),
130 self.close_events.clone(),
131 self.config.clone(),
132 );
133
134 *self.join_handle.lock().unwrap() = Some(handle);
135 }
136
137 fn on_new_span(
138 &self,
139 attrs: &span::Attributes<'_>,
140 id: &span::Id,
141 ctx: tracing_subscriber::layer::Context<'_, S>,
142 ) {
143 if !self.is_tracking() {
144 return;
145 }
146
147 let span = ctx.span(id).unwrap();
148
149 if H::ENABLED {
150 if let Some(parent) = span.parent() {
151 if self.is_excluded(&parent) {
152 no_track(|| {
153 span.extensions_mut().insert(SpanExcluded);
154 });
155 return;
156 }
157 }
158
159 if !self.sampler.should_sample(&span) {
160 no_track(|| {
161 span.extensions_mut().insert(SpanExcluded);
162 });
163 return;
164 }
165 }
166
167 let mut ext = span.extensions_mut();
168
169 if self.config.track_wall_time {
170 no_track(|| {
171 ext.insert(SpanWallTimings::default());
172 });
173 }
174
175 if self.config.track_cpu_time {
176 no_track(|| {
177 ext.insert(SpanCpuTimings::default());
178 });
179 }
180
181 let mut labels = Vec::new();
182
183 if !self.config.record_labels.is_empty() {
184 no_track(|| {
185 attrs.values().record(&mut LabelVisitor {
186 filter: &self.config.record_labels,
187 labels: &mut labels,
188 });
189 });
190 }
191
192 let meta = span.metadata();
193
194 let scope = no_track(|| span.scope().map(|s| s.id()).collect());
195 self.span_events.push(SpanEvent {
196 span_id: id.clone(),
197 kind: SpanEventKind::Created(SpanMetadata {
198 scope,
199 callsite: meta.callsite(),
200 target: meta.target(),
201 span_name: meta.name(),
202 file: span.metadata().file().unwrap_or(""),
203 line: span.metadata().line().unwrap_or(0),
204 labels,
205 }),
206 });
207 }
208
209 fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
210 if !self.is_tracking() {
211 return;
212 }
213
214 let span = ctx.span(id).unwrap();
215
216 if self.is_excluded(&span) {
217 return;
218 }
219
220 if self.config.track_wall_time {
221 let mut ext = span.extensions_mut();
222 let timings = ext.get_mut::<SpanWallTimings>().unwrap();
223
224 timings.busy_start = Instant::now();
225
226 #[allow(clippy::cast_possible_truncation)]
227 let elapsed_nanos = timings.idle_start.elapsed().as_nanos() as u64;
228
229 if elapsed_nanos > 0 {
230 self.span_events.push(SpanEvent {
231 span_id: id.clone(),
232 kind: SpanEventKind::Wall(SpanWallTime {
233 elapsed_idle_nanos: elapsed_nanos,
234 elapsed_busy_nanos: 0,
235 }),
236 });
237 }
238 }
239
240 if self.config.track_cpu_time {
241 let mut ext = span.extensions_mut();
242 let timings = ext.get_mut::<SpanCpuTimings>().unwrap();
243 *timings = thread_cpu_timings();
244 }
245
246 if self.config.track_allocations {
247 allocator::enter_allocation_group(AllocationGroup::from(id));
248 }
249 }
250
251 fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
252 if !self.is_tracking() {
253 return;
254 }
255
256 let span = ctx.span(id).unwrap();
257
258 if self.is_excluded(&span) {
259 return;
260 }
261
262 #[allow(clippy::cast_possible_truncation)]
263 if self.config.track_wall_time {
264 let mut ext = span.extensions_mut();
265 let span_timings = ext.get_mut::<SpanWallTimings>().unwrap();
266 span_timings.idle_start = Instant::now();
267
268 let elapsed_nanos = span_timings.busy_start.elapsed().as_nanos() as u64;
269
270 if elapsed_nanos > 0 {
271 self.span_events.push(SpanEvent {
272 span_id: id.clone(),
273 kind: SpanEventKind::Wall(SpanWallTime {
274 elapsed_busy_nanos: elapsed_nanos,
275 elapsed_idle_nanos: 0,
276 }),
277 });
278 }
279 }
280
281 #[allow(clippy::cast_possible_truncation)]
282 if self.config.track_cpu_time {
283 let mut ext = span.extensions_mut();
284 let timings = ext.get_mut::<SpanCpuTimings>().unwrap();
285
286 let current_timings = thread_cpu_timings();
287
288 let elapsed_user_nanos = (current_timings
289 .elapsed_user
290 .saturating_sub(timings.elapsed_user))
291 .as_nanos() as u64;
292 let elapsed_system_nanos = (current_timings
293 .elapsed_system
294 .saturating_sub(timings.elapsed_system))
295 .as_nanos() as u64;
296
297 *timings = current_timings;
298
299 if elapsed_user_nanos > 0 || elapsed_system_nanos > 0 {
300 self.span_events.push(SpanEvent {
301 span_id: id.clone(),
302 kind: SpanEventKind::Cpu(SpanCpuTime {
303 elapsed_user_nanos,
304 elapsed_system_nanos,
305 }),
306 });
307 }
308 }
309
310 if self.config.track_allocations {
311 allocator::exit_allocation_group();
312 allocator::flush_thread_statistics();
313 }
314 }
315
316 fn on_close(&self, id: tracing_core::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
317 if !self.is_tracking() {
318 return;
319 }
320
321 let span = ctx.span(&id).unwrap();
322
323 if self.is_excluded(&span) {
324 return;
325 }
326
327 if self.config.track_wall_time {
328 let ext = span.extensions();
329 let span_timings = ext.get::<SpanWallTimings>().unwrap();
330 #[allow(clippy::cast_possible_truncation)]
331 let elapsed_nanos = span_timings.idle_start.elapsed().as_nanos() as u64;
332 drop(ext);
333
334 self.span_events.push(SpanEvent {
335 span_id: id.clone(),
336 kind: SpanEventKind::Wall(SpanWallTime {
337 elapsed_idle_nanos: elapsed_nanos,
338 elapsed_busy_nanos: 0,
339 }),
340 });
341 }
342
343 if self.config.track_allocations {
344 allocator::flush_thread_statistics();
345 }
346
347 self.span_closed_events.push(SpanClosedEvent {
348 span_id: id.clone(),
349 });
350 }
351}
352
353pub struct ShutdownHandle {
360 close_events: Arc<EventQueue<()>>,
361 join_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
362}
363
364impl ShutdownHandle {
365 pub fn shutdown(self) {}
368}
369
370impl Drop for ShutdownHandle {
371 fn drop(&mut self) {
372 if let Some(handle) = self.join_handle.lock().unwrap().take() {
373 self.close_events.push(());
374 handle.join().unwrap();
375 }
376 }
377}
378
379#[derive(Debug, Default, Clone)]
380struct SpanCpuTimings {
381 elapsed_user: Duration,
383 elapsed_system: Duration,
385}
386
387#[derive(Debug)]
388struct SpanWallTimings {
389 idle_start: Instant,
390 busy_start: Instant,
391}
392
393impl Default for SpanWallTimings {
394 fn default() -> Self {
395 let now = Instant::now();
396
397 Self {
398 idle_start: now,
399 busy_start: now,
400 }
401 }
402}
403
404struct SpanExcluded;
406
407struct LabelVisitor<'a> {
408 filter: &'a IndexSet<String>,
409 labels: &'a mut Vec<(&'static str, String)>,
410}
411
412impl Visit for LabelVisitor<'_> {
413 fn record_f64(&mut self, field: &tracing_core::Field, value: f64) {
414 if !self.filter.contains(field.name()) {
415 return;
416 }
417
418 self.labels.push((field.name(), value.to_string()));
419 }
420
421 fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
422 if !self.filter.contains(field.name()) {
423 return;
424 }
425
426 self.labels.push((field.name(), value.to_string()));
427 }
428
429 fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
430 if !self.filter.contains(field.name()) {
431 return;
432 }
433
434 self.labels.push((field.name(), value.to_string()));
435 }
436
437 fn record_i128(&mut self, field: &tracing_core::Field, value: i128) {
438 if !self.filter.contains(field.name()) {
439 return;
440 }
441
442 self.labels.push((field.name(), value.to_string()));
443 }
444
445 fn record_u128(&mut self, field: &tracing_core::Field, value: u128) {
446 if !self.filter.contains(field.name()) {
447 return;
448 }
449
450 self.labels.push((field.name(), value.to_string()));
451 }
452
453 fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
454 if !self.filter.contains(field.name()) {
455 return;
456 }
457
458 self.labels.push((field.name(), value.to_string()));
459 }
460
461 fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
462 if !self.filter.contains(field.name()) {
463 return;
464 }
465
466 self.labels.push((field.name(), value.to_string()));
467 }
468
469 fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
470 if !self.filter.contains(field.name()) {
471 return;
472 }
473
474 self.labels.push((field.name(), format!("{value:?}")));
475 }
476}
477
478fn spawn_reporter_thread<R: ProfileReporter + Send + 'static>(
479 reporter: R,
480 memory_events: Arc<EventQueue<SpanMemoryUpdateEvent>>,
481 span_events: Arc<EventQueue<SpanEvent>>,
482 span_closed_events: Arc<EventQueue<SpanClosedEvent>>,
483 close_events: Arc<EventQueue<()>>,
484 config: ProfileConfig,
485) -> JoinHandle<()> {
486 const TARGET_MAX_EVENTS_PER_ITERATION: u64 = 5000;
487 const BUSY_LOOP_EVENT_THRESHOLD: u64 = TARGET_MAX_EVENTS_PER_ITERATION * 2;
488 const MAX_SLEEP_MS: u64 = 1000;
489
490 std::thread::spawn(move || {
491 let mut reporter = reporter;
492
493 let mut span_scopes: IndexMap<span::Id, Vec<span::Id>> = IndexMap::default();
494 let mut span_descendant_time: IndexMap<span::Id, SpanDescendantTimeStats> =
495 IndexMap::default();
496
497 let mut span_heap: IndexMap<span::Id, SpanMemoryStats> = IndexMap::default();
498
499 let mut last_heap_snapshot = Instant::now();
500
501 loop {
502 let mut iteration_event_count = 0_u64;
503
504 while let Some(event) = span_events.pop() {
505 iteration_event_count += 1;
506
507 match event.kind {
508 SpanEventKind::Created(metadata) => {
509 if config.track_heap {
510 span_heap.insert(
511 event.span_id.clone(),
512 SpanMemoryStats {
513 closed: false,
514 stats: Default::default(),
515 },
516 );
517 }
518
519 if config.track_cpu_time || config.track_wall_time {
520 span_scopes.insert(event.span_id.clone(), metadata.scope.clone());
521 }
522
523 reporter.span_created(&event.span_id, metadata);
524 }
525 SpanEventKind::Cpu(mut cpu) => {
526 if let Some(span_scope) = span_scopes.get(&event.span_id) {
527 for parent_span in span_scope.iter().skip(1) {
528 let time_stats = span_descendant_time
529 .entry(parent_span.clone())
530 .or_insert(SpanDescendantTimeStats {
531 wall_idle_nanos: 0,
532 wall_busy_nanos: 0,
533 cpu_user_nanos: 0,
534 cpu_system_nanos: 0,
535 });
536
537 time_stats.cpu_user_nanos += cpu.elapsed_user_nanos;
538 time_stats.cpu_system_nanos += cpu.elapsed_system_nanos;
539 }
540 }
541
542 cpu.elapsed_user_nanos = cpu.elapsed_user_nanos.saturating_sub(
543 span_descendant_time
544 .get(&event.span_id)
545 .map(|descendants| descendants.cpu_user_nanos)
546 .unwrap_or(0),
547 );
548
549 cpu.elapsed_system_nanos = cpu.elapsed_system_nanos.saturating_sub(
550 span_descendant_time
551 .get(&event.span_id)
552 .map(|descendants| descendants.cpu_system_nanos)
553 .unwrap_or(0),
554 );
555
556 if cpu.elapsed_user_nanos > 0 || cpu.elapsed_system_nanos > 0 {
557 reporter.span_cpu(&event.span_id, cpu);
558 }
559 }
560 SpanEventKind::Wall(mut wall) => {
561 if let Some(span_scope) = span_scopes.get(&event.span_id) {
562 for parent_span in span_scope.iter().skip(1) {
563 let time_stats = span_descendant_time
564 .entry(parent_span.clone())
565 .or_insert(SpanDescendantTimeStats {
566 wall_idle_nanos: 0,
567 wall_busy_nanos: 0,
568 cpu_user_nanos: 0,
569 cpu_system_nanos: 0,
570 });
571
572 time_stats.wall_idle_nanos += wall.elapsed_idle_nanos;
573 time_stats.wall_busy_nanos += wall.elapsed_busy_nanos;
574 }
575 }
576
577 wall.elapsed_busy_nanos = wall.elapsed_busy_nanos.saturating_sub(
578 span_descendant_time
579 .get(&event.span_id)
580 .map(|descendants| descendants.wall_busy_nanos)
581 .unwrap_or(0),
582 );
583
584 wall.elapsed_idle_nanos = wall.elapsed_idle_nanos.saturating_sub(
585 span_descendant_time
586 .get(&event.span_id)
587 .map(|descendants| descendants.wall_idle_nanos)
588 .unwrap_or(0),
589 );
590
591 if wall.elapsed_busy_nanos > 0 || wall.elapsed_idle_nanos > 0 {
592 reporter.span_wall(&event.span_id, wall);
593 }
594 }
595 }
596 }
597
598 while let Some(event) = memory_events.pop() {
599 iteration_event_count += 1;
600
601 if event.stats.allocation_count > 0 || event.stats.allocated_bytes > 0 {
602 reporter.span_allocations(
603 &event.span_id,
604 SpanAllocations {
605 allocation_count: event.stats.allocation_count,
606 allocated_bytes: event.stats.allocated_bytes,
607 },
608 );
609 }
610
611 if !config.track_heap {
612 continue;
613 }
614
615 let Some(tracked_stats) = span_heap.get_mut(&event.span_id) else {
616 continue;
617 };
618
619 tracked_stats.stats.allocated_bytes += event.stats.allocated_bytes;
620 tracked_stats.stats.allocation_count += event.stats.allocation_count;
621 tracked_stats.stats.deallocated_bytes += event.stats.deallocated_bytes;
622 tracked_stats.stats.deallocation_count += event.stats.deallocation_count;
623 }
624
625 while let Some(event) = span_closed_events.pop() {
626 iteration_event_count += 1;
627
628 if config.track_heap {
629 if let Some(tracked_stats) = span_heap.get_mut(&event.span_id) {
630 tracked_stats.closed = true;
633 } else {
634 reporter.span_destroyed(&event.span_id);
635 }
636 } else {
637 reporter.span_destroyed(&event.span_id);
638 }
639
640 if config.track_cpu_time || config.track_wall_time {
641 span_scopes.swap_remove(&event.span_id);
642 span_descendant_time.swap_remove(&event.span_id);
643 }
644 }
645
646 if config.track_heap {
647 span_heap.retain(|span_id, stats| {
650 if stats.closed {
651 let in_use_bytes = stats.stats.in_use_bytes();
652 let in_use_count = stats.stats.in_use_count();
653
654 if in_use_bytes == 0 && in_use_count == 0 {
655 reporter.span_heap(
656 span_id,
657 SpanHeap {
658 in_use_bytes,
659 in_use_count,
660 },
661 );
662
663 reporter.span_destroyed(span_id);
664 return false;
665 }
666 }
667
668 true
669 });
670
671 if span_heap.len() > config.max_closed_heap_spans {
672 let closed_span_count =
673 span_heap.iter().filter(|(_, stats)| stats.closed).count();
674
675 if closed_span_count > config.max_closed_heap_spans {
676 let needs_remove_count = closed_span_count - config.max_closed_heap_spans;
677
678 let mut to_remove = span_heap
679 .iter()
680 .filter_map(|(id, stats)| {
681 if stats.closed {
682 Some((id.clone(), stats.stats.in_use_bytes()))
683 } else {
684 None
685 }
686 })
687 .collect::<Vec<_>>();
688
689 to_remove.sort_unstable_by_key(|(_, bytes)| *bytes);
690 to_remove.truncate(needs_remove_count);
691
692 span_heap.retain(|span_id, _| {
693 if to_remove
694 .iter()
695 .any(|(removed_id, _)| removed_id == span_id)
696 {
697 reporter.span_heap(
698 span_id,
699 SpanHeap {
700 in_use_bytes: 0,
701 in_use_count: 0,
702 },
703 );
704 reporter.span_destroyed(span_id);
705 false
706 } else {
707 true
708 }
709 });
710 tracing::debug!(
711 "dropped {} closed spans from heap tracking due to limits",
712 to_remove.len()
713 );
714 }
715 }
716
717 if last_heap_snapshot.elapsed() >= config.heap_snapshot_interval {
719 last_heap_snapshot = Instant::now();
720
721 for (span_id, stats) in &span_heap {
722 reporter.span_heap(
723 span_id,
724 SpanHeap {
725 in_use_bytes: stats.stats.in_use_bytes(),
726 in_use_count: stats.stats.in_use_count(),
727 },
728 );
729 }
730 }
731 }
732
733 reporter.flush();
734
735 if close_events.pop().is_some() {
736 break;
737 }
738
739 if iteration_event_count > BUSY_LOOP_EVENT_THRESHOLD {
740 continue;
741 }
742
743 #[allow(
746 clippy::cast_sign_loss,
747 clippy::cast_possible_truncation,
748 clippy::cast_possible_wrap
749 )]
750 let sleep_ms = (-i64::try_from(
751 iteration_event_count / (TARGET_MAX_EVENTS_PER_ITERATION / MAX_SLEEP_MS),
752 )
753 .unwrap_or(0)
754 + MAX_SLEEP_MS as i64)
755 .max(5) as u64;
756
757 tracing::trace!(iteration_event_count, sleep_ms, "reporter loop");
758
759 std::thread::sleep(Duration::from_millis(sleep_ms));
760 }
761 })
762}
763
764#[derive(Debug, Clone, Copy)]
765struct SpanMemoryStats {
766 closed: bool,
770 stats: GroupAllocationStatistics,
772}
773
774#[allow(clippy::struct_field_names)]
778#[derive(Debug, Clone, Copy)]
779struct SpanDescendantTimeStats {
780 wall_idle_nanos: u64,
781 wall_busy_nanos: u64,
782 cpu_user_nanos: u64,
783 cpu_system_nanos: u64,
784}
785
786#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
787fn thread_cpu_timings() -> SpanCpuTimings {
788 let mut usage = core::mem::MaybeUninit::<libc::rusage>::uninit();
789 let status = unsafe { libc::getrusage(libc::RUSAGE_THREAD, usage.as_mut_ptr()) };
791 if status != 0 {
792 tracing::debug!("getrusage failed: {status}");
793 return SpanCpuTimings::default();
794 }
795 let usage = unsafe { usage.assume_init() };
797
798 let elapsed_user = Duration::new(
799 usage.ru_utime.tv_sec as u64,
800 usage.ru_utime.tv_usec as u32 * 1000,
801 );
802 let elapsed_system = Duration::new(
803 usage.ru_stime.tv_sec as u64,
804 usage.ru_stime.tv_usec as u32 * 1000,
805 );
806 SpanCpuTimings {
807 elapsed_user,
808 elapsed_system,
809 }
810}