use std::{io::Write, iter, time::SystemTime};
use backend::PProfBackend;
use flate2::write::GzEncoder;
use prost::Message;
use tracing::span;
use tracing_core::callsite;
use crate::{
IndexMap, IndexSet,
config::ProfileConfig,
reporter::{SpanAllocations, SpanCpuTime, SpanHeap, SpanWallTime},
};
use super::{ProfileReporter, SpanMetadata};
pub mod backend;
#[path = "pprof/perftools.profiles.rs"]
#[allow(clippy::pedantic, missing_docs)]
#[rustfmt::skip]
mod proto;
#[derive(Debug, Default, Clone)]
#[must_use]
pub struct PProfReporterConfig {
aggregate_samples: bool,
}
impl PProfReporterConfig {
pub fn new() -> Self {
Self::default()
}
pub fn aggregate_samples(mut self, aggregate: bool) -> Self {
self.aggregate_samples = aggregate;
self
}
}
#[must_use]
pub struct PProfReporter<B> {
builder: PProfProfileBuilder,
config: PProfReporterConfig,
backend: B,
}
impl<B> PProfReporter<B>
where
B: PProfBackend + Send + Sync + 'static,
{
pub fn new(config: PProfReporterConfig, backend: B) -> Self {
let builder = PProfProfileBuilder {
..Default::default()
};
Self {
builder,
config,
backend,
}
}
#[must_use]
pub fn backend(&self) -> &B {
&self.backend
}
}
impl<B> ProfileReporter for PProfReporter<B>
where
B: PProfBackend,
{
fn init(&mut self, config: &ProfileConfig) {
self.builder.max_samples = if config.max_samples > 0 {
Some(config.max_samples)
} else {
None
};
}
fn span_created(&mut self, span: &span::Id, meta: SpanMetadata) {
let callsite = meta.callsite.clone();
let location_id = self.builder.location_id(&meta);
let call_stack = self.builder.call_stack(location_id, meta.scope);
let span_info = SpanInfo {
call_stack,
callsite,
labels: meta.labels,
alive: true,
};
self.builder.spans.insert(span.clone(), span_info);
}
fn span_destroyed(&mut self, span: &span::Id) {
if let Some(span_info) = self.builder.spans.get_mut(span) {
span_info.alive = false;
}
}
fn span_cpu(&mut self, span: &span::Id, cpu: SpanCpuTime) {
self.builder.cpu_samples.push(SpanCpuSample {
span_id: span.clone(),
user_nanos: cpu.elapsed_user_nanos,
system_nanos: cpu.elapsed_system_nanos,
});
}
fn span_wall(&mut self, span: &span::Id, wall: SpanWallTime) {
self.builder.wall_samples.push(SpanWallSample {
span_id: span.clone(),
busy_nanos: wall.elapsed_busy_nanos,
idle_nanos: wall.elapsed_idle_nanos,
});
}
fn span_allocations(&mut self, span: &span::Id, allocs: SpanAllocations) {
self.builder.alloc_samples.push(SpanAllocSample {
span_id: span.clone(),
allocation_count: allocs.allocation_count,
allocated_bytes: allocs.allocated_bytes,
});
}
fn span_heap(&mut self, span: &span::Id, heap: SpanHeap) {
self.builder.heap_samples.push(SpanHeapSample {
span_id: span.clone(),
in_use_count: heap.in_use_count,
in_use_bytes: heap.in_use_bytes,
});
}
fn flush(&mut self) {
if self.config.aggregate_samples {
self.builder.aggregate_samples();
}
while self.builder.has_samples() {
let profiles = self.builder.build_pprof();
if let Some(profile) = profiles.cpu {
self.backend
.save_profile("cpu", profiles.start, profiles.end, profile);
}
if let Some(profile) = profiles.allocations {
self.backend
.save_profile("alloc", profiles.start, profiles.end, profile);
}
if let Some(profile) = profiles.wall_clock {
self.backend
.save_profile("wall", profiles.start, profiles.end, profile);
}
if let Some(profile) = profiles.heap {
self.backend
.save_profile("heap", profiles.start, profiles.end, profile);
}
}
}
}
#[derive(Debug)]
struct SpanLocation {
target: &'static str,
span_name: &'static str,
file: &'static str,
line: i64,
}
struct SpanCpuSample {
span_id: span::Id,
user_nanos: u64,
system_nanos: u64,
}
struct SpanWallSample {
span_id: span::Id,
busy_nanos: u64,
idle_nanos: u64,
}
struct SpanAllocSample {
span_id: span::Id,
allocation_count: u64,
allocated_bytes: u64,
}
struct SpanHeapSample {
span_id: span::Id,
in_use_count: u64,
in_use_bytes: u64,
}
struct SpanInfo {
call_stack: Vec<u64>,
callsite: callsite::Identifier,
labels: Vec<(&'static str, String)>,
alive: bool,
}
struct PProfProfiles {
start: SystemTime,
end: SystemTime,
allocations: Option<Vec<u8>>,
heap: Option<Vec<u8>>,
cpu: Option<Vec<u8>>,
wall_clock: Option<Vec<u8>>,
}
pub(crate) struct PProfProfileBuilder {
locations: IndexMap<callsite::Identifier, SpanLocation>,
spans: IndexMap<span::Id, SpanInfo>,
max_samples: Option<usize>,
cpu_samples: Vec<SpanCpuSample>,
wall_samples: Vec<SpanWallSample>,
alloc_samples: Vec<SpanAllocSample>,
heap_samples: Vec<SpanHeapSample>,
start: SystemTime,
}
impl Default for PProfProfileBuilder {
fn default() -> Self {
Self {
locations: Default::default(),
cpu_samples: Default::default(),
wall_samples: Default::default(),
alloc_samples: Default::default(),
heap_samples: Default::default(),
max_samples: None,
spans: Default::default(),
start: SystemTime::now(),
}
}
}
impl PProfProfileBuilder {
fn has_samples(&self) -> bool {
!self.cpu_samples.is_empty()
|| !self.wall_samples.is_empty()
|| !self.alloc_samples.is_empty()
|| !self.heap_samples.is_empty()
}
fn aggregate_samples(&mut self) {
if !self.cpu_samples.is_empty() {
let mut aggregated_samples = IndexMap::default();
for sample in self.cpu_samples.drain(..) {
let entry = aggregated_samples
.entry(sample.span_id.clone())
.or_insert_with(|| SpanCpuSample {
span_id: sample.span_id.clone(),
user_nanos: 0,
system_nanos: 0,
});
entry.user_nanos += sample.user_nanos;
entry.system_nanos += sample.system_nanos;
}
self.cpu_samples.extend(aggregated_samples.into_values());
}
if !self.wall_samples.is_empty() {
let mut aggregated_samples = IndexMap::default();
for sample in self.wall_samples.drain(..) {
let entry = aggregated_samples
.entry(sample.span_id.clone())
.or_insert_with(|| SpanWallSample {
span_id: sample.span_id.clone(),
busy_nanos: 0,
idle_nanos: 0,
});
entry.busy_nanos += sample.busy_nanos;
entry.idle_nanos += sample.idle_nanos;
}
self.wall_samples.extend(aggregated_samples.into_values());
}
if !self.alloc_samples.is_empty() {
let mut aggregated_samples = IndexMap::default();
for sample in self.alloc_samples.drain(..) {
let entry = aggregated_samples
.entry(sample.span_id.clone())
.or_insert_with(|| SpanAllocSample {
span_id: sample.span_id.clone(),
allocation_count: 0,
allocated_bytes: 0,
});
entry.allocation_count += sample.allocation_count;
entry.allocated_bytes += sample.allocated_bytes;
}
self.alloc_samples.extend(aggregated_samples.into_values());
}
if !self.heap_samples.is_empty() {
let mut aggregated_samples = IndexMap::default();
for sample in self.heap_samples.drain(..) {
let entry = aggregated_samples
.entry(sample.span_id.clone())
.or_insert_with(|| SpanHeapSample {
span_id: sample.span_id.clone(),
in_use_count: 0,
in_use_bytes: 0,
});
entry.in_use_count = sample.in_use_count;
entry.in_use_bytes = sample.in_use_bytes;
}
self.heap_samples.extend(aggregated_samples.into_values());
}
}
#[must_use]
fn build_pprof(&mut self) -> PProfProfiles {
let now = SystemTime::now();
let mut profile_data = PProfProfiles {
start: self.start,
end: now,
allocations: None,
heap: None,
cpu: None,
wall_clock: None,
};
#[allow(clippy::cast_possible_truncation)]
if let Some(mut profile) = self.build_cpu_profile() {
profile.time_nanos = profile_data
.start
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
profile.duration_nanos = profile_data
.end
.duration_since(profile_data.start)
.unwrap()
.as_nanos() as i64;
profile_data.cpu = Some(encode_profile(profile));
self.cpu_samples.clear();
}
#[allow(clippy::cast_possible_truncation)]
if let Some(mut profile) = self.build_alloc_profile() {
profile.time_nanos = profile_data
.start
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
profile.duration_nanos = profile_data
.end
.duration_since(profile_data.start)
.unwrap()
.as_nanos() as i64;
profile_data.allocations = Some(encode_profile(profile));
self.alloc_samples.clear();
}
#[allow(clippy::cast_possible_truncation)]
if let Some(mut profile) = self.build_wall_clock_profile() {
profile.time_nanos = profile_data
.start
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
profile.duration_nanos = profile_data
.end
.duration_since(profile_data.start)
.unwrap()
.as_nanos() as i64;
profile_data.wall_clock = Some(encode_profile(profile));
self.wall_samples.clear();
}
#[allow(clippy::cast_possible_truncation)]
if let Some(mut profile) = self.build_heap_profile() {
profile.time_nanos = profile_data
.start
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
profile.duration_nanos = profile_data
.end
.duration_since(profile_data.start)
.unwrap()
.as_nanos() as i64;
profile_data.heap = Some(encode_profile(profile));
self.heap_samples.clear();
}
self.start = now;
self.spans.retain(|span_id, span| {
if span.alive {
true
} else {
let mut span_ids_in_remaining_samples = self
.cpu_samples
.iter()
.map(|sample| &sample.span_id)
.chain(self.wall_samples.iter().map(|sample| &sample.span_id))
.chain(self.alloc_samples.iter().map(|sample| &sample.span_id))
.chain(self.heap_samples.iter().map(|sample| &sample.span_id));
span_ids_in_remaining_samples.any(|id| id == span_id)
}
});
profile_data
}
fn build_cpu_profile(&mut self) -> Option<proto::Profile> {
if self.cpu_samples.is_empty() {
return None;
}
let mut string_table = StringTable::default();
let mut profile = proto::Profile::default();
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("cpu"),
unit: string_table.add("nanoseconds"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("cpu_system"),
unit: string_table.add("nanoseconds"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("cpu_user"),
unit: string_table.add("nanoseconds"),
});
let sample_end_idx = if let Some(max_samples) = self.max_samples {
max_samples.min(self.cpu_samples.len())
} else {
self.cpu_samples.len()
};
for sample in self.cpu_samples.drain(..sample_end_idx) {
let Some(span_info) = self.spans.get(&sample.span_id) else {
tracing::debug!("span information not found for sample");
continue;
};
#[allow(clippy::cast_possible_wrap)]
let values = vec![
(sample.user_nanos + sample.system_nanos) as i64,
sample.system_nanos as i64,
sample.user_nanos as i64,
];
profile.sample.push(proto::Sample {
value: values,
location_id: span_info.call_stack.clone(),
label: span_info
.labels
.iter()
.map(|(k, v)| proto::Label {
key: string_table.add(k),
str: string_table.add(v),
..Default::default()
})
.collect(),
});
}
self.serialize_locations(&mut string_table, &mut profile);
profile.string_table = string_table.table.into_iter().collect();
Some(profile)
}
fn build_alloc_profile(&mut self) -> Option<proto::Profile> {
if self.alloc_samples.is_empty() {
return None;
}
let mut string_table = StringTable::default();
let mut profile = proto::Profile::default();
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("alloc_objects"),
unit: string_table.add("count"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("alloc_space"),
unit: string_table.add("bytes"),
});
let sample_end_idx = if let Some(max_samples) = self.max_samples {
max_samples.min(self.alloc_samples.len())
} else {
self.alloc_samples.len()
};
for sample in self.alloc_samples.drain(..sample_end_idx) {
let Some(span_info) = self.spans.get(&sample.span_id) else {
tracing::debug!("span information not found for sample");
continue;
};
#[allow(clippy::cast_possible_wrap)]
let values = vec![
sample.allocation_count as i64,
sample.allocated_bytes as i64,
];
profile.sample.push(proto::Sample {
value: values,
location_id: span_info.call_stack.clone(),
label: span_info
.labels
.iter()
.map(|(k, v)| proto::Label {
key: string_table.add(k),
str: string_table.add(v),
..Default::default()
})
.collect(),
});
}
self.serialize_locations(&mut string_table, &mut profile);
profile.string_table = string_table.table.into_iter().collect();
Some(profile)
}
fn build_wall_clock_profile(&mut self) -> Option<proto::Profile> {
if self.wall_samples.is_empty() {
return None;
}
let mut string_table = StringTable::default();
let mut profile = proto::Profile::default();
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("wall"),
unit: string_table.add("nanoseconds"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("wall_busy"),
unit: string_table.add("nanoseconds"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("wall_idle"),
unit: string_table.add("nanoseconds"),
});
let sample_end_idx = if let Some(max_samples) = self.max_samples {
max_samples.min(self.wall_samples.len())
} else {
self.wall_samples.len()
};
for sample in self.wall_samples.drain(..sample_end_idx) {
let Some(span_info) = self.spans.get(&sample.span_id) else {
tracing::debug!("span information not found for sample");
continue;
};
#[allow(clippy::cast_possible_wrap)]
let values = vec![
(sample.busy_nanos + sample.idle_nanos) as i64,
sample.busy_nanos as i64,
sample.idle_nanos as i64,
];
profile.sample.push(proto::Sample {
value: values,
location_id: span_info.call_stack.clone(),
label: span_info
.labels
.iter()
.map(|(k, v)| proto::Label {
key: string_table.add(k),
str: string_table.add(v),
..Default::default()
})
.collect(),
});
}
self.serialize_locations(&mut string_table, &mut profile);
profile.string_table = string_table.table.into_iter().collect();
Some(profile)
}
fn build_heap_profile(&mut self) -> Option<proto::Profile> {
if self.heap_samples.is_empty() {
return None;
}
let mut string_table = StringTable::default();
let mut profile = proto::Profile::default();
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("inuse_objects"),
unit: string_table.add("count"),
});
profile.sample_type.push(proto::ValueType {
r#type: string_table.add("inuse_space"),
unit: string_table.add("bytes"),
});
let sample_end_idx = if let Some(max_samples) = self.max_samples {
max_samples.min(self.heap_samples.len())
} else {
self.heap_samples.len()
};
let mut seen_spans = IndexSet::default();
for sample in self.heap_samples.drain(..sample_end_idx).rev() {
let Some(span_info) = self.spans.get(&sample.span_id) else {
tracing::debug!("span information not found for sample");
continue;
};
if seen_spans.contains(&sample.span_id) {
continue;
}
seen_spans.insert(sample.span_id.clone());
#[allow(clippy::cast_possible_wrap)]
let values = vec![sample.in_use_count as i64, sample.in_use_bytes as i64];
profile.sample.push(proto::Sample {
value: values,
location_id: span_info.call_stack.clone(),
label: span_info
.labels
.iter()
.map(|(k, v)| proto::Label {
key: string_table.add(k),
str: string_table.add(v),
..Default::default()
})
.collect(),
});
}
self.serialize_locations(&mut string_table, &mut profile);
profile.period_type = Some(proto::ValueType {
r#type: string_table.add("space"),
unit: string_table.add("bytes"),
});
profile.period = 1;
profile.string_table = string_table.table.into_iter().collect();
Some(profile)
}
fn serialize_locations(&self, string_table: &mut StringTable, profile: &mut proto::Profile) {
for (id, location) in self.locations.values().enumerate() {
let id = (id + 1) as u64;
let mut full_name = String::new();
full_name.push_str(location.target);
full_name.push_str("::");
full_name.push_str(location.span_name);
let full_name_idx = string_table.add(&full_name);
let function = proto::Function {
id,
name: full_name_idx,
system_name: full_name_idx,
filename: string_table.add(location.file),
start_line: location.line,
};
profile.function.push(function);
let loc = proto::Location {
id,
line: vec![proto::Line {
function_id: id,
line: location.line,
column: 0,
}],
is_folded: false,
..Default::default()
};
profile.location.push(loc);
}
}
fn call_stack(&mut self, this_location_id: u64, scope: Vec<span::Id>) -> Vec<u64> {
iter::once(this_location_id)
.chain(scope.into_iter().skip(1).filter_map(|span_id| {
self.spans
.get(&span_id)
.and_then(|span| self.locations.get_index_of(&span.callsite))
.map(|idx| (idx + 1) as u64)
}))
.collect()
}
fn location_id(&mut self, meta: &SpanMetadata) -> u64 {
let callsite = meta.callsite.clone();
let target = meta.target;
let span_name = meta.span_name;
let file = meta.file;
let line = i64::from(meta.line);
#[allow(clippy::cast_possible_wrap)]
if let Some(idx) = self.locations.get_index_of(&callsite) {
(idx + 1) as u64
} else {
let location = SpanLocation {
target,
span_name,
file,
line,
};
self.locations.insert(callsite.clone(), location);
(self.locations.len()) as u64
}
}
}
#[derive(Debug)]
struct StringTable {
table: IndexSet<String>,
}
impl Default for StringTable {
fn default() -> Self {
let mut table = IndexSet::default();
table.insert(String::new());
Self { table }
}
}
impl StringTable {
fn add(&mut self, s: &str) -> i64 {
#[allow(clippy::cast_possible_wrap)]
if let Some(idx) = self.table.get_index_of(s) {
idx as i64
} else {
self.table.insert(s.to_string());
(self.table.len() - 1) as i64
}
}
}
fn encode_profile(profile: proto::Profile) -> Vec<u8> {
let mut buf = Vec::new();
profile.encode(&mut buf).unwrap();
let mut gz_buf = Vec::new();
let mut encoder = GzEncoder::new(&mut gz_buf, flate2::Compression::default());
encoder.write_all(&buf).unwrap();
encoder.flush().unwrap();
drop(encoder);
gz_buf
}