use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::{self, JoinHandle},
time::Duration,
};
use crate::{
backend::Report,
pyroscope::PyroscopeConfig,
utils::{get_time_range, merge_tags_with_app_name},
Result,
};
const LOG_TAG: &str = "Pyroscope::Session";
#[derive(Debug)]
pub enum SessionSignal {
Session(Session),
Kill,
}
#[derive(Debug)]
pub struct SessionManager {
pub handle: Option<JoinHandle<Result<()>>>,
pub tx: SyncSender<SessionSignal>,
}
impl SessionManager {
pub fn new() -> Result<Self> {
log::info!(target: LOG_TAG, "Creating SessionManager");
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);
let handle = Some(thread::spawn(move || {
log::trace!(target: LOG_TAG, "Started");
while let Ok(signal) = rx.recv() {
match signal {
SessionSignal::Session(session) => {
match session.send() {
Ok(_) => log::trace!("SessionManager - Session sent"),
Err(e) => log::error!("SessionManager - Failed to send session: {}", e),
}
}
SessionSignal::Kill => {
log::trace!(target: LOG_TAG, "Kill signal received");
return Ok(());
}
}
}
Ok(())
}));
Ok(SessionManager { handle, tx })
}
pub fn push(&self, session: SessionSignal) -> Result<()> {
self.tx.send(session)?;
log::trace!(target: LOG_TAG, "SessionSignal pushed");
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Session {
pub config: PyroscopeConfig,
pub reports: Vec<Report>,
pub from: u64,
pub until: u64,
}
impl Session {
pub fn new(until: u64, config: PyroscopeConfig, reports: Vec<Report>) -> Result<Self> {
log::info!(target: LOG_TAG, "Creating Session");
let time_range = get_time_range(until)?;
Ok(Self {
config,
reports,
from: time_range.from - 10,
until: time_range.until - 10,
})
}
pub fn send(self) -> Result<()> {
if self.reports.is_empty() {
return Ok(());
}
for report in &self.reports {
self.process(report)?;
}
Ok(())
}
fn process(&self, report: &Report) -> Result<()> {
log::info!(
target: LOG_TAG,
"Sending Session: {} - {}",
self.from,
self.until
);
let report_u8 = report.to_string().into_bytes();
if report_u8.is_empty() {
return Ok(());
}
let client = reqwest::blocking::Client::new();
let url = self.config.url.clone();
let application_name = merge_tags_with_app_name(
self.config.application_name.clone(),
report.metadata.tags.clone().into_iter().collect(),
)?;
let mut req_builder = client
.post(format!("{}/ingest", url))
.header("Content-Type", "binary/octet-stream");
if let Some(auth_token) = self.config.auth_token.clone() {
req_builder = req_builder.bearer_auth(auth_token);
}
req_builder
.query(&[
("name", application_name.as_str()),
("from", &format!("{}", self.from)),
("until", &format!("{}", self.until)),
("format", "folded"),
("sampleRate", &format!("{}", self.config.sample_rate)),
("spyName", self.config.spy_name.as_str()),
])
.body(report_u8)
.timeout(Duration::from_secs(10))
.send()?;
Ok(())
}
}