use pprof::ProfilerGuardBuilder;
use tokio::sync::watch;
const PUSH_INTERVAL_SECS: u64 = 10;
const SAMPLE_FREQUENCY_HZ: i32 = 100;
pub(crate) struct PyroscopeGuard {
shutdown_tx: watch::Sender<bool>,
#[allow(dead_code)]
handle: tokio::task::JoinHandle<()>,
}
impl Drop for PyroscopeGuard {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
}
}
pub(crate) fn start_pyroscope_push(endpoint: &str, service_name: &str) -> Option<PyroscopeGuard> {
let endpoint = endpoint.to_owned();
let service_name = service_name.to_owned();
let guard = match ProfilerGuardBuilder::default()
.frequency(SAMPLE_FREQUENCY_HZ)
.build()
{
Ok(g) => g,
Err(e) => {
tracing::error!("failed to start pprof profiler: {e}");
return None;
}
};
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(PUSH_INTERVAL_SECS));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = shutdown_rx.changed() => break,
}
push_profile(&guard, &client, &endpoint, &service_name).await;
}
tracing::debug!("pyroscope push task shutting down");
});
Some(PyroscopeGuard {
shutdown_tx,
handle,
})
}
async fn push_profile(
guard: &pprof::ProfilerGuard<'static>,
client: &reqwest::Client,
endpoint: &str,
service_name: &str,
) {
use pprof::protos::Message as _;
let report = match guard.report().build() {
Ok(r) => r,
Err(e) => {
tracing::debug!("pprof report build failed: {e}");
return;
}
};
let profile = match report.pprof() {
Ok(p) => p,
Err(e) => {
tracing::debug!("pprof profile encode failed: {e}");
return;
}
};
let mut body = Vec::new();
if let Err(e) = profile.encode(&mut body) {
tracing::debug!("pprof protobuf encode failed: {e}");
return;
}
let url = format!(
"{endpoint}/ingest?name={service_name}.cpu{{sampleRate={SAMPLE_FREQUENCY_HZ}}}&format=pprof"
);
match client.post(&url).body(body).send().await {
Err(e) => tracing::debug!("pyroscope push transport error: {e}"),
Ok(resp) => {
if let Err(e) = resp.error_for_status() {
tracing::warn!("pyroscope push rejected (non-2xx): {e}");
}
}
}
}