use std::{
io::Write,
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::{self, JoinHandle},
time::Duration,
};
use crate::encode::gen::push::{PushRequest, RawProfileSeries, RawSample};
use crate::encode::gen::types::LabelPair;
use crate::{
backend::{Report, ReportBatch, ReportData},
encode::pprof,
pyroscope::PyroscopeConfig,
utils::get_time_range,
Result,
};
use libflate::gzip::Encoder;
use prost::Message;
use reqwest::Url;
use uuid::Uuid;
const LOG_TAG: &str = "Pyroscope::Session";
pub enum SessionSignal {
Session(Box<Session>),
Kill,
}
#[derive(Debug)]
pub struct SessionManager {
pub handle: Option<JoinHandle<Result<()>>>,
pub tx: SyncSender<SessionSignal>,
}
impl SessionManager {
pub fn new() -> Result<Self> {
log::info!(target: LOG_TAG, "Creating SessionManager");
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);
let handle = Some(thread::spawn(move || {
log::trace!(target: LOG_TAG, "Started");
let client = reqwest::blocking::Client::new();
while let Ok(signal) = rx.recv() {
match signal {
SessionSignal::Session(session) => {
match (*session).push(&client) {
Ok(_) => log::trace!("SessionManager - Session sent"),
Err(e) => log::error!("SessionManager - Failed to send session: {}", e),
}
}
SessionSignal::Kill => {
log::trace!(target: LOG_TAG, "Kill signal received");
return Ok(());
}
}
}
Ok(())
}));
Ok(SessionManager { handle, tx })
}
pub fn push(&self, session: SessionSignal) -> Result<()> {
self.tx.send(session)?;
log::trace!(target: LOG_TAG, "SessionSignal pushed");
Ok(())
}
}
pub struct Session {
pub config: PyroscopeConfig,
pub batch: ReportBatch,
pub from: u64,
pub until: u64,
}
impl Session {
pub fn new(until: u64, config: PyroscopeConfig, batch: ReportBatch) -> Result<Self> {
log::info!(target: LOG_TAG, "Creating Session");
let time_range = get_time_range(until)?;
Ok(Self {
config,
batch,
from: time_range.from - 10,
until: time_range.until - 10,
})
}
fn push(self, client: &reqwest::blocking::Client) -> Result<()> {
log::info!(target: LOG_TAG, "Sending Session: {} - {}", self.from, self.until);
let ReportBatch { profile_type, data } = self.batch;
let raw_profile = match data {
ReportData::RawPprof(pprof_bytes) => {
if self.config.func.is_some() {
log::warn!(target: LOG_TAG, "report transform function is not supported with raw pprof backends (e.g. jemalloc)");
}
pprof_bytes
}
ReportData::Reports(reports) => {
let transformed: Vec<Report>;
let encode_input = match self.config.func {
None => &reports,
Some(f) => {
transformed = reports.iter().map(|r| f(r.to_owned())).collect();
&transformed
}
};
pprof::encode(
encode_input,
self.config.sample_rate,
self.from * 1_000_000_000,
(self.until - self.from) * 1_000_000_000,
)
.encode_to_vec()
}
};
let mut labels: Vec<LabelPair> = Vec::with_capacity(2 + self.config.tags.len());
labels.push(LabelPair {
name: "service_name".to_string(),
value: self.config.application_name.clone(),
});
labels.push(LabelPair {
name: "__name__".to_string(),
value: profile_type,
});
for tag in self.config.tags {
labels.push(LabelPair {
name: tag.0,
value: tag.1,
})
}
let req = PushRequest {
series: vec![RawProfileSeries {
labels,
samples: vec![RawSample {
raw_profile,
id: Uuid::new_v4().to_string(),
}],
}],
};
let req = Self::gzip(&req.encode_to_vec())?;
let mut url = Url::parse(&self.config.url)?;
url.path_segments_mut()
.unwrap()
.push("push.v1.PusherService")
.push("Push");
let mut req_builder = client
.post(url.as_str())
.header(
"User-Agent",
format!(
"pyroscope-rs/{}/{} reqwest",
self.config.spy_name, self.config.spy_version
),
)
.header("Content-Type", "application/proto")
.header("Content-Encoding", "gzip");
if let Some(basic_auth) = &self.config.basic_auth {
req_builder = req_builder.basic_auth(
basic_auth.username.clone(),
Some(basic_auth.password.clone()),
);
}
if let Some(tenant_id) = &self.config.tenant_id {
req_builder = req_builder.header("X-Scope-OrgID", tenant_id);
}
for (k, v) in &self.config.http_headers {
req_builder = req_builder.header(k, v);
}
let mut response = req_builder
.body(req)
.timeout(Duration::from_secs(10))
.send()?;
let status = response.status();
if status.is_success() {
let mut sink = std::io::sink();
_ = response.copy_to(&mut sink);
} else {
let resp = response.text();
let resp = match &resp {
Ok(t) => t,
Err(_) => "",
};
log::error!(target: LOG_TAG, "Sending Session failed {} {}", status.as_u16(), resp);
}
Ok(())
}
fn gzip(report: &[u8]) -> Result<Vec<u8>> {
let mut encoder = Encoder::new(Vec::new())?;
encoder.write_all(report)?;
let compressed_data = encoder.finish().into_result()?;
Ok(compressed_data)
}
}