use std::{
sync::{Arc, Mutex},
thread::JoinHandle,
time::Duration,
};
use crate::{
IndexMap, IndexSet, Instant,
allocator::{self, AllocationGroup, GroupAllocationStatistics, QueueAllocTracker, no_track},
config::ProfileConfig,
events::{SpanClosedEvent, SpanMemoryUpdateEvent},
reporter::{ProfileReporter, SpanAllocations, SpanCpuTime, SpanMetadata, SpanWallTime},
sampler::{AlwaysSample, HeadSampler},
};
use crate::{
events::{EventQueue, SpanEvent, SpanEventKind},
reporter::SpanHeap,
};
use tracing::{Subscriber, field::Visit, span};
use tracing_subscriber::{
Layer,
registry::{LookupSpan, SpanRef},
};
pub struct ProfileLayer<R, H = AlwaysSample>
where
R: ProfileReporter + Send + Sync + 'static,
{
config: ProfileConfig,
span_events: Arc<EventQueue<SpanEvent>>,
span_closed_events: Arc<EventQueue<SpanClosedEvent>>,
memory_events: Arc<EventQueue<SpanMemoryUpdateEvent>>,
close_events: Arc<EventQueue<()>>,
join_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
reporter: Option<R>,
sampler: H,
}
impl<R> ProfileLayer<R>
where
R: ProfileReporter + Send + Sync + 'static,
{
pub fn new(config: ProfileConfig, reporter: R) -> Self {
Self::new_with_sampler(config, reporter, AlwaysSample)
}
}
impl<R, H> ProfileLayer<R, H>
where
R: ProfileReporter + Send + Sync + 'static,
H: HeadSampler + 'static,
{
pub fn new_with_sampler(config: ProfileConfig, reporter: R, sampler: H) -> Self {
ProfileLayer {
span_events: Arc::new(EventQueue::new()),
memory_events: Arc::new(EventQueue::new()),
span_closed_events: Arc::new(EventQueue::new()),
close_events: Arc::new(EventQueue::new()),
join_handle: Arc::new(Mutex::new(None)),
reporter: Some(reporter),
sampler,
config,
}
}
pub fn config(&self) -> &ProfileConfig {
&self.config
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
ShutdownHandle {
close_events: self.close_events.clone(),
join_handle: self.join_handle.clone(),
}
}
#[inline]
#[must_use]
const fn is_tracking(&self) -> bool {
self.config.track_allocations || self.config.track_cpu_time || self.config.track_wall_time
}
#[allow(clippy::unused_self)]
fn is_excluded<S>(&self, span: &SpanRef<'_, S>) -> bool
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
if !H::ENABLED {
return false;
}
let ext = span.extensions();
ext.get::<SpanExcluded>().is_some()
}
}
impl<S, R, H> Layer<S> for ProfileLayer<R, H>
where
S: Subscriber + for<'a> LookupSpan<'a>,
R: ProfileReporter + Send + Sync + 'static,
H: HeadSampler + 'static,
{
fn on_layer(&mut self, _subscriber: &mut S) {
if !self.is_tracking() {
return;
}
allocator::set_global_tracker(QueueAllocTracker {
events: self.memory_events.clone(),
});
let mut reporter = self.reporter.take().unwrap();
reporter.init(&self.config);
let handle = spawn_reporter_thread(
reporter,
self.memory_events.clone(),
self.span_events.clone(),
self.span_closed_events.clone(),
self.close_events.clone(),
self.config.clone(),
);
*self.join_handle.lock().unwrap() = Some(handle);
}
fn on_new_span(
&self,
attrs: &span::Attributes<'_>,
id: &span::Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
if !self.is_tracking() {
return;
}
let span = ctx.span(id).unwrap();
if H::ENABLED {
if let Some(parent) = span.parent() {
if self.is_excluded(&parent) {
no_track(|| {
span.extensions_mut().insert(SpanExcluded);
});
return;
}
}
if !self.sampler.should_sample(&span) {
no_track(|| {
span.extensions_mut().insert(SpanExcluded);
});
return;
}
}
let mut ext = span.extensions_mut();
if self.config.track_wall_time {
no_track(|| {
ext.insert(SpanWallTimings::default());
});
}
if self.config.track_cpu_time {
no_track(|| {
ext.insert(SpanCpuTimings::default());
});
}
let mut labels = Vec::new();
if !self.config.record_labels.is_empty() {
no_track(|| {
attrs.values().record(&mut LabelVisitor {
filter: &self.config.record_labels,
labels: &mut labels,
});
});
}
let meta = span.metadata();
let scope = no_track(|| span.scope().map(|s| s.id()).collect());
self.span_events.push(SpanEvent {
span_id: id.clone(),
kind: SpanEventKind::Created(SpanMetadata {
scope,
callsite: meta.callsite(),
target: meta.target(),
span_name: meta.name(),
file: span.metadata().file().unwrap_or(""),
line: span.metadata().line().unwrap_or(0),
labels,
}),
});
}
fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
if !self.is_tracking() {
return;
}
let span = ctx.span(id).unwrap();
if self.is_excluded(&span) {
return;
}
if self.config.track_wall_time {
let mut ext = span.extensions_mut();
let timings = ext.get_mut::<SpanWallTimings>().unwrap();
timings.busy_start = Instant::now();
#[allow(clippy::cast_possible_truncation)]
let elapsed_nanos = timings.idle_start.elapsed().as_nanos() as u64;
if elapsed_nanos > 0 {
self.span_events.push(SpanEvent {
span_id: id.clone(),
kind: SpanEventKind::Wall(SpanWallTime {
elapsed_idle_nanos: elapsed_nanos,
elapsed_busy_nanos: 0,
}),
});
}
}
if self.config.track_cpu_time {
let mut ext = span.extensions_mut();
let timings = ext.get_mut::<SpanCpuTimings>().unwrap();
*timings = thread_cpu_timings();
}
if self.config.track_allocations {
allocator::enter_allocation_group(AllocationGroup::from(id));
}
}
fn on_exit(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
if !self.is_tracking() {
return;
}
let span = ctx.span(id).unwrap();
if self.is_excluded(&span) {
return;
}
#[allow(clippy::cast_possible_truncation)]
if self.config.track_wall_time {
let mut ext = span.extensions_mut();
let span_timings = ext.get_mut::<SpanWallTimings>().unwrap();
span_timings.idle_start = Instant::now();
let elapsed_nanos = span_timings.busy_start.elapsed().as_nanos() as u64;
if elapsed_nanos > 0 {
self.span_events.push(SpanEvent {
span_id: id.clone(),
kind: SpanEventKind::Wall(SpanWallTime {
elapsed_busy_nanos: elapsed_nanos,
elapsed_idle_nanos: 0,
}),
});
}
}
#[allow(clippy::cast_possible_truncation)]
if self.config.track_cpu_time {
let mut ext = span.extensions_mut();
let timings = ext.get_mut::<SpanCpuTimings>().unwrap();
let current_timings = thread_cpu_timings();
let elapsed_user_nanos = (current_timings
.elapsed_user
.saturating_sub(timings.elapsed_user))
.as_nanos() as u64;
let elapsed_system_nanos = (current_timings
.elapsed_system
.saturating_sub(timings.elapsed_system))
.as_nanos() as u64;
*timings = current_timings;
if elapsed_user_nanos > 0 || elapsed_system_nanos > 0 {
self.span_events.push(SpanEvent {
span_id: id.clone(),
kind: SpanEventKind::Cpu(SpanCpuTime {
elapsed_user_nanos,
elapsed_system_nanos,
}),
});
}
}
if self.config.track_allocations {
allocator::exit_allocation_group();
allocator::flush_thread_statistics();
}
}
fn on_close(&self, id: tracing_core::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
if !self.is_tracking() {
return;
}
let span = ctx.span(&id).unwrap();
if self.is_excluded(&span) {
return;
}
if self.config.track_wall_time {
let ext = span.extensions();
let span_timings = ext.get::<SpanWallTimings>().unwrap();
#[allow(clippy::cast_possible_truncation)]
let elapsed_nanos = span_timings.idle_start.elapsed().as_nanos() as u64;
drop(ext);
self.span_events.push(SpanEvent {
span_id: id.clone(),
kind: SpanEventKind::Wall(SpanWallTime {
elapsed_idle_nanos: elapsed_nanos,
elapsed_busy_nanos: 0,
}),
});
}
if self.config.track_allocations {
allocator::flush_thread_statistics();
}
self.span_closed_events.push(SpanClosedEvent {
span_id: id.clone(),
});
}
}
pub struct ShutdownHandle {
close_events: Arc<EventQueue<()>>,
join_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
}
impl ShutdownHandle {
pub fn shutdown(self) {}
}
impl Drop for ShutdownHandle {
fn drop(&mut self) {
if let Some(handle) = self.join_handle.lock().unwrap().take() {
self.close_events.push(());
handle.join().unwrap();
}
}
}
#[derive(Debug, Default, Clone)]
struct SpanCpuTimings {
elapsed_user: Duration,
elapsed_system: Duration,
}
#[derive(Debug)]
struct SpanWallTimings {
idle_start: Instant,
busy_start: Instant,
}
impl Default for SpanWallTimings {
fn default() -> Self {
let now = Instant::now();
Self {
idle_start: now,
busy_start: now,
}
}
}
struct SpanExcluded;
struct LabelVisitor<'a> {
filter: &'a IndexSet<String>,
labels: &'a mut Vec<(&'static str, String)>,
}
impl Visit for LabelVisitor<'_> {
fn record_f64(&mut self, field: &tracing_core::Field, value: f64) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_i128(&mut self, field: &tracing_core::Field, value: i128) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_u128(&mut self, field: &tracing_core::Field, value: u128) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), value.to_string()));
}
fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
if !self.filter.contains(field.name()) {
return;
}
self.labels.push((field.name(), format!("{value:?}")));
}
}
fn spawn_reporter_thread<R: ProfileReporter + Send + 'static>(
reporter: R,
memory_events: Arc<EventQueue<SpanMemoryUpdateEvent>>,
span_events: Arc<EventQueue<SpanEvent>>,
span_closed_events: Arc<EventQueue<SpanClosedEvent>>,
close_events: Arc<EventQueue<()>>,
config: ProfileConfig,
) -> JoinHandle<()> {
const TARGET_MAX_EVENTS_PER_ITERATION: u64 = 5000;
const BUSY_LOOP_EVENT_THRESHOLD: u64 = TARGET_MAX_EVENTS_PER_ITERATION * 2;
const MAX_SLEEP_MS: u64 = 1000;
std::thread::spawn(move || {
let mut reporter = reporter;
let mut span_scopes: IndexMap<span::Id, Vec<span::Id>> = IndexMap::default();
let mut span_descendant_time: IndexMap<span::Id, SpanDescendantTimeStats> =
IndexMap::default();
let mut span_heap: IndexMap<span::Id, SpanMemoryStats> = IndexMap::default();
let mut last_heap_snapshot = Instant::now();
loop {
let mut iteration_event_count = 0_u64;
while let Some(event) = span_events.pop() {
iteration_event_count += 1;
match event.kind {
SpanEventKind::Created(metadata) => {
if config.track_heap {
span_heap.insert(
event.span_id.clone(),
SpanMemoryStats {
closed: false,
stats: Default::default(),
},
);
}
if config.track_cpu_time || config.track_wall_time {
span_scopes.insert(event.span_id.clone(), metadata.scope.clone());
}
reporter.span_created(&event.span_id, metadata);
}
SpanEventKind::Cpu(mut cpu) => {
if let Some(span_scope) = span_scopes.get(&event.span_id) {
for parent_span in span_scope.iter().skip(1) {
let time_stats = span_descendant_time
.entry(parent_span.clone())
.or_insert(SpanDescendantTimeStats {
wall_idle_nanos: 0,
wall_busy_nanos: 0,
cpu_user_nanos: 0,
cpu_system_nanos: 0,
});
time_stats.cpu_user_nanos += cpu.elapsed_user_nanos;
time_stats.cpu_system_nanos += cpu.elapsed_system_nanos;
}
}
cpu.elapsed_user_nanos = cpu.elapsed_user_nanos.saturating_sub(
span_descendant_time
.get(&event.span_id)
.map(|descendants| descendants.cpu_user_nanos)
.unwrap_or(0),
);
cpu.elapsed_system_nanos = cpu.elapsed_system_nanos.saturating_sub(
span_descendant_time
.get(&event.span_id)
.map(|descendants| descendants.cpu_system_nanos)
.unwrap_or(0),
);
if cpu.elapsed_user_nanos > 0 || cpu.elapsed_system_nanos > 0 {
reporter.span_cpu(&event.span_id, cpu);
}
}
SpanEventKind::Wall(mut wall) => {
if let Some(span_scope) = span_scopes.get(&event.span_id) {
for parent_span in span_scope.iter().skip(1) {
let time_stats = span_descendant_time
.entry(parent_span.clone())
.or_insert(SpanDescendantTimeStats {
wall_idle_nanos: 0,
wall_busy_nanos: 0,
cpu_user_nanos: 0,
cpu_system_nanos: 0,
});
time_stats.wall_idle_nanos += wall.elapsed_idle_nanos;
time_stats.wall_busy_nanos += wall.elapsed_busy_nanos;
}
}
wall.elapsed_busy_nanos = wall.elapsed_busy_nanos.saturating_sub(
span_descendant_time
.get(&event.span_id)
.map(|descendants| descendants.wall_busy_nanos)
.unwrap_or(0),
);
wall.elapsed_idle_nanos = wall.elapsed_idle_nanos.saturating_sub(
span_descendant_time
.get(&event.span_id)
.map(|descendants| descendants.wall_idle_nanos)
.unwrap_or(0),
);
if wall.elapsed_busy_nanos > 0 || wall.elapsed_idle_nanos > 0 {
reporter.span_wall(&event.span_id, wall);
}
}
}
}
while let Some(event) = memory_events.pop() {
iteration_event_count += 1;
if event.stats.allocation_count > 0 || event.stats.allocated_bytes > 0 {
reporter.span_allocations(
&event.span_id,
SpanAllocations {
allocation_count: event.stats.allocation_count,
allocated_bytes: event.stats.allocated_bytes,
},
);
}
if !config.track_heap {
continue;
}
let Some(tracked_stats) = span_heap.get_mut(&event.span_id) else {
continue;
};
tracked_stats.stats.allocated_bytes += event.stats.allocated_bytes;
tracked_stats.stats.allocation_count += event.stats.allocation_count;
tracked_stats.stats.deallocated_bytes += event.stats.deallocated_bytes;
tracked_stats.stats.deallocation_count += event.stats.deallocation_count;
}
while let Some(event) = span_closed_events.pop() {
iteration_event_count += 1;
if config.track_heap {
if let Some(tracked_stats) = span_heap.get_mut(&event.span_id) {
tracked_stats.closed = true;
} else {
reporter.span_destroyed(&event.span_id);
}
} else {
reporter.span_destroyed(&event.span_id);
}
if config.track_cpu_time || config.track_wall_time {
span_scopes.swap_remove(&event.span_id);
span_descendant_time.swap_remove(&event.span_id);
}
}
if config.track_heap {
span_heap.retain(|span_id, stats| {
if stats.closed {
let in_use_bytes = stats.stats.in_use_bytes();
let in_use_count = stats.stats.in_use_count();
if in_use_bytes == 0 && in_use_count == 0 {
reporter.span_heap(
span_id,
SpanHeap {
in_use_bytes,
in_use_count,
},
);
reporter.span_destroyed(span_id);
return false;
}
}
true
});
if span_heap.len() > config.max_closed_heap_spans {
let closed_span_count =
span_heap.iter().filter(|(_, stats)| stats.closed).count();
if closed_span_count > config.max_closed_heap_spans {
let needs_remove_count = closed_span_count - config.max_closed_heap_spans;
let mut to_remove = span_heap
.iter()
.filter_map(|(id, stats)| {
if stats.closed {
Some((id.clone(), stats.stats.in_use_bytes()))
} else {
None
}
})
.collect::<Vec<_>>();
to_remove.sort_unstable_by_key(|(_, bytes)| *bytes);
to_remove.truncate(needs_remove_count);
span_heap.retain(|span_id, _| {
if to_remove
.iter()
.any(|(removed_id, _)| removed_id == span_id)
{
reporter.span_heap(
span_id,
SpanHeap {
in_use_bytes: 0,
in_use_count: 0,
},
);
reporter.span_destroyed(span_id);
false
} else {
true
}
});
tracing::debug!(
"dropped {} closed spans from heap tracking due to limits",
to_remove.len()
);
}
}
if last_heap_snapshot.elapsed() >= config.heap_snapshot_interval {
last_heap_snapshot = Instant::now();
for (span_id, stats) in &span_heap {
reporter.span_heap(
span_id,
SpanHeap {
in_use_bytes: stats.stats.in_use_bytes(),
in_use_count: stats.stats.in_use_count(),
},
);
}
}
}
reporter.flush();
if close_events.pop().is_some() {
break;
}
if iteration_event_count > BUSY_LOOP_EVENT_THRESHOLD {
continue;
}
#[allow(
clippy::cast_sign_loss,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap
)]
let sleep_ms = (-i64::try_from(
iteration_event_count / (TARGET_MAX_EVENTS_PER_ITERATION / MAX_SLEEP_MS),
)
.unwrap_or(0)
+ MAX_SLEEP_MS as i64)
.max(5) as u64;
tracing::trace!(iteration_event_count, sleep_ms, "reporter loop");
std::thread::sleep(Duration::from_millis(sleep_ms));
}
})
}
#[derive(Debug, Clone, Copy)]
struct SpanMemoryStats {
closed: bool,
stats: GroupAllocationStatistics,
}
#[allow(clippy::struct_field_names)]
#[derive(Debug, Clone, Copy)]
struct SpanDescendantTimeStats {
wall_idle_nanos: u64,
wall_busy_nanos: u64,
cpu_user_nanos: u64,
cpu_system_nanos: u64,
}
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
fn thread_cpu_timings() -> SpanCpuTimings {
let mut usage = core::mem::MaybeUninit::<libc::rusage>::uninit();
let status = unsafe { libc::getrusage(libc::RUSAGE_THREAD, usage.as_mut_ptr()) };
if status != 0 {
tracing::debug!("getrusage failed: {status}");
return SpanCpuTimings::default();
}
let usage = unsafe { usage.assume_init() };
let elapsed_user = Duration::new(
usage.ru_utime.tv_sec as u64,
usage.ru_utime.tv_usec as u32 * 1000,
);
let elapsed_system = Duration::new(
usage.ru_stime.tv_sec as u64,
usage.ru_stime.tv_usec as u32 * 1000,
);
SpanCpuTimings {
elapsed_user,
elapsed_system,
}
}