use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
thread::JoinHandle,
};
use crate::pyroscope::PyroscopeConfig;
use crate::utils::merge_tags_with_app_name;
use crate::Result;
#[derive(Debug)]
pub enum SessionSignal {
Session(Session),
Kill,
}
#[derive(Debug)]
pub struct SessionManager {
pub handle: Option<JoinHandle<Result<()>>>,
pub tx: SyncSender<SessionSignal>,
}
impl SessionManager {
pub fn new() -> Result<Self> {
log::info!("SessionManager - Creating SessionManager");
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);
let handle = Some(thread::spawn(move || {
log::trace!("SessionManager - SessionManager thread started");
while let Ok(signal) = rx.recv() {
match signal {
SessionSignal::Session(session) => {
session.send()?;
log::trace!("SessionManager - Session sent");
}
SessionSignal::Kill => {
log::trace!("SessionManager - Kill signal received");
return Ok(());
}
}
}
Ok(())
}));
Ok(SessionManager { handle, tx })
}
pub fn push(&self, session: SessionSignal) -> Result<()> {
self.tx.send(session)?;
log::trace!("SessionManager - SessionSignal pushed");
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Session {
pub config: PyroscopeConfig,
pub report: Vec<u8>,
pub from: u64,
pub until: u64,
}
impl Session {
pub fn new(mut until: u64, config: PyroscopeConfig, report: Vec<u8>) -> Result<Self> {
log::info!("Session - Creating Session");
if until == 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
until = now
.checked_add(10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap())
.unwrap();
}
let from = until.checked_sub(10u64).unwrap();
Ok(Self {
config,
report,
from,
until,
})
}
pub fn send(self) -> Result<()> {
log::info!("Session - Sending Session");
if self.report.is_empty() {
return Ok(());
}
let client = reqwest::blocking::Client::new();
let url = self.config.url.clone();
let application_name = merge_tags_with_app_name(
self.config.application_name.clone(),
self.config.tags.clone(),
)?;
client
.post(format!("{}/ingest", url))
.header("Content-Type", "binary/octet-stream")
.query(&[
("name", application_name.as_str()),
("from", &format!("{}", self.from)),
("until", &format!("{}", self.until)),
("format", "folded"),
("sampleRate", &format!("{}", self.config.sample_rate)),
("spyName", "pyroscope-rs"),
])
.body(self.report)
.timeout(std::time::Duration::from_secs(10))
.send()?;
Ok(())
}
}