use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::{self, JoinHandle},
time::Duration,
io::Write,
};
use reqwest::Url;
use libflate::gzip::Encoder;
use crate::{
backend::Report,
pyroscope::{PyroscopeConfig, Compression},
utils::{get_time_range, merge_tags_with_app_name},
Result,
PyroscopeError,
encode::{folded, pprof},
};
use crate::backend::EncodedReport;
use crate::pyroscope::ReportEncoding;
const LOG_TAG: &str = "Pyroscope::Session";
#[derive(Debug)]
pub enum SessionSignal {
Session(Session),
Kill,
}
#[derive(Debug)]
pub struct SessionManager {
pub handle: Option<JoinHandle<Result<()>>>,
pub tx: SyncSender<SessionSignal>,
}
impl SessionManager {
pub fn new() -> Result<Self> {
log::info!(target: LOG_TAG, "Creating SessionManager");
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);
let handle = Some(thread::spawn(move || {
log::trace!(target: LOG_TAG, "Started");
while let Ok(signal) = rx.recv() {
match signal {
SessionSignal::Session(session) => {
match session.send() {
Ok(_) => log::trace!("SessionManager - Session sent"),
Err(e) => log::error!("SessionManager - Failed to send session: {}", e),
}
}
SessionSignal::Kill => {
log::trace!(target: LOG_TAG, "Kill signal received");
return Ok(());
}
}
}
Ok(())
}));
Ok(SessionManager { handle, tx })
}
pub fn push(&self, session: SessionSignal) -> Result<()> {
self.tx.send(session)?;
log::trace!(target: LOG_TAG, "SessionSignal pushed");
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Session {
pub config: PyroscopeConfig,
pub reports: Vec<Report>,
pub from: u64,
pub until: u64,
}
impl Session {
pub fn new(until: u64, config: PyroscopeConfig, reports: Vec<Report>) -> Result<Self> {
log::info!(target: LOG_TAG, "Creating Session");
let time_range = get_time_range(until)?;
Ok(Self {
config,
reports,
from: time_range.from - 10,
until: time_range.until - 10,
})
}
pub fn send(self) -> Result<()> {
if self.reports.is_empty() {
return Ok(());
}
let reports = self.process_reports(&self.reports);
let reports = self.encode_reports(reports);
let reports = self.compress_reports(reports);
for report in reports {
self.upload(report)?;
}
Ok(())
}
fn process_reports(&self, reports: &Vec<Report>) -> Vec<Report> {
if let Some(func) = self.config.func {
reports.iter().map(|r| func(r.to_owned())).collect()
} else {
reports.to_owned()
}
}
fn encode_reports(&self, reports: Vec<Report>) -> Vec<EncodedReport> {
log::debug!(target: LOG_TAG, "Encoding {} reports to {:?}", reports.len(), self.config.report_encoding);
match &self.config.report_encoding {
ReportEncoding::FOLDED => folded::encode(&reports),
ReportEncoding::PPROF => pprof::encode(&reports,
self.config.sample_rate,
self.from * 1_000_000,
(self.until - self.from) * 1_000_000,
),
}
}
fn compress_reports(&self, reports: Vec<EncodedReport>) -> Vec<EncodedReport> {
log::debug!(target: LOG_TAG, "Compressing {} reports to {:?}", reports.len(), self.config.compression);
reports.into_iter().map(|r|
match &self.config.compression {
None => r,
Some(Compression::GZIP) => {
let mut encoder = Encoder::new(Vec::new()).unwrap();
encoder.write_all(&r.data).unwrap();
let compressed_data = encoder.finish().into_result().unwrap();
EncodedReport {
format: r.format,
content_type: r.content_type,
metadata: r.metadata,
content_encoding: "gzip".to_string(),
data: compressed_data,
}
}
}
).collect()
}
fn upload(&self, report: EncodedReport) -> Result<()> {
log::info!(target: LOG_TAG, "Sending Session: {} - {}", self.from, self.until);
if report.data.is_empty() {
return Ok(());
}
let client = reqwest::blocking::Client::new();
let application_name = merge_tags_with_app_name(
self.config.application_name.clone(),
report.metadata.tags.clone().into_iter().collect(),
)?;
let mut url = Url::parse(&self.config.url)?;
url.path_segments_mut()
.map_err(|_e| PyroscopeError::new("url construction failure - cannot_be_a_base"))?
.push("ingest");
let mut req_builder = client
.post(url.as_str())
.header("Content-Type", report.content_type.as_str());
if let Some(auth_token) = &self.config.auth_token {
req_builder = req_builder.bearer_auth(auth_token);
} else if let Some(basic_auth) = &self.config.basic_auth {
req_builder = req_builder.basic_auth(basic_auth.username.clone(), Some(basic_auth.password.clone()));
}
if report.content_encoding != "" {
req_builder = req_builder.header("Content-Encoding", report.content_encoding.as_str());
}
if let Some(tenant_id) = &self.config.tenant_id {
req_builder = req_builder.header("X-Scope-OrgID", tenant_id);
}
for (k, v) in &self.config.http_headers {
req_builder = req_builder.header(k, v);
};
let response = req_builder
.query(&[
("name", application_name.as_str()),
("from", &format!("{}", self.from)),
("until", &format!("{}", self.until)),
("format", report.format.as_str()),
("sampleRate", &format!("{}", self.config.sample_rate)),
("spyName", self.config.spy_name.as_str()),
])
.body(report.data)
.timeout(Duration::from_secs(10))
.send()?;
if !response.status().is_success() {
log::error!(target: LOG_TAG, "Sending Session failed {}", response.status().as_u16());
}
Ok(())
}
}