use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};
use crate::backends::pprof::Pprof;
use crate::backends::Backend;
use crate::error::Result;
use crate::session::Session;
use crate::session::SessionManager;
use crate::session::SessionSignal;
use crate::timer::Timer;
#[derive(Clone, Debug)]
pub struct PyroscopeConfig {
pub url: String,
pub application_name: String,
pub tags: HashMap<String, String>,
pub sample_rate: i32,
}
impl PyroscopeConfig {
pub fn new<S: AsRef<str>>(url: S, application_name: S) -> Self {
Self {
url: url.as_ref().to_owned(),
application_name: application_name.as_ref().to_owned(),
tags: HashMap::new(),
sample_rate: 100i32,
}
}
pub fn sample_rate(self, sample_rate: i32) -> Self {
Self {
sample_rate,
..self
}
}
pub fn tags(self, tags: &[(&str, &str)]) -> Self {
let tags_hashmap: HashMap<String, String> = tags
.to_owned()
.iter()
.cloned()
.map(|(a, b)| (a.to_owned(), b.to_owned()))
.collect();
Self {
tags: tags_hashmap,
..self
}
}
}
pub struct PyroscopeAgentBuilder {
backend: Arc<Mutex<dyn Backend>>,
config: PyroscopeConfig,
}
impl PyroscopeAgentBuilder {
pub fn new<S: AsRef<str>>(url: S, application_name: S) -> Self {
Self {
backend: Arc::new(Mutex::new(Pprof::default())), config: PyroscopeConfig::new(url, application_name),
}
}
pub fn backend<T: 'static>(self, backend: T) -> Self
where T: Backend {
Self {
backend: Arc::new(Mutex::new(backend)),
..self
}
}
pub fn sample_rate(self, sample_rate: i32) -> Self {
Self {
config: self.config.sample_rate(sample_rate),
..self
}
}
pub fn tags(self, tags: &[(&str, &str)]) -> Self {
Self {
config: self.config.tags(tags),
..self
}
}
pub fn build(self) -> Result<PyroscopeAgent> {
let backend = Arc::clone(&self.backend);
backend.lock()?.initialize(self.config.sample_rate)?;
log::trace!("PyroscopeAgent - Backend initialized");
let timer = Timer::default().initialize()?;
log::trace!("PyroscopeAgent - Timer initialized");
let session_manager = SessionManager::new()?;
log::trace!("PyroscopeAgent - SessionManager initialized");
Ok(PyroscopeAgent {
backend: self.backend,
config: self.config,
timer,
session_manager,
tx: None,
handle: None,
running: Arc::new((Mutex::new(false), Condvar::new())),
})
}
}
#[derive(Debug)]
pub struct PyroscopeAgent {
timer: Timer,
session_manager: SessionManager,
tx: Option<Sender<u64>>,
handle: Option<JoinHandle<Result<()>>>,
running: Arc<(Mutex<bool>, Condvar)>,
pub backend: Arc<Mutex<dyn Backend>>,
pub config: PyroscopeConfig,
}
impl Drop for PyroscopeAgent {
fn drop(&mut self) {
log::debug!("PyroscopeAgent::drop()");
match self.timer.drop_listeners() {
Ok(_) => log::trace!("PyroscopeAgent - Dropped timer listeners"),
Err(_) => log::error!("PyroscopeAgent - Error Dropping timer listeners"),
}
match self.timer.handle.take().unwrap().join() {
Ok(_) => log::trace!("PyroscopeAgent - Dropped timer thread"),
Err(_) => log::error!("PyroscopeAgent - Error Dropping timer thread"),
}
match self.session_manager.push(SessionSignal::Kill) {
Ok(_) => log::trace!("PyroscopeAgent - Sent kill signal to SessionManager"),
Err(_) => log::error!("PyroscopeAgent - Error sending kill signal to SessionManager"),
}
match self.session_manager.handle.take().unwrap().join() {
Ok(_) => log::trace!("PyroscopeAgent - Dropped SessionManager thread"),
Err(_) => log::error!("PyroscopeAgent - Error Dropping SessionManager thread"),
}
match self.handle.take().unwrap().join() {
Ok(_) => log::trace!("PyroscopeAgent - Dropped main thread"),
Err(_) => log::error!("PyroscopeAgent - Error Dropping main thread"),
}
}
}
impl PyroscopeAgent {
pub fn builder<S: AsRef<str>>(url: S, application_name: S) -> PyroscopeAgentBuilder {
PyroscopeAgentBuilder::new(url, application_name)
}
fn _start(&mut self) -> Result<()> {
log::debug!("PyroscopeAgent - Starting");
let backend = Arc::clone(&self.backend);
backend.lock()?.start()?;
let pair = Arc::clone(&self.running);
let (lock, _cvar) = &*pair;
let mut running = lock.lock()?;
*running = true;
drop(running);
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx);
let config = self.config.clone();
let stx = self.session_manager.tx.clone();
self.handle = Some(std::thread::spawn(move || {
log::trace!("PyroscopeAgent - Main Thread started");
while let Ok(time) = rx.recv() {
log::trace!("PyroscopeAgent - Sending session {}", time);
let report = backend.lock()?.report()?;
stx.send(SessionSignal::Session(Session::new(
time,
config.clone(),
report,
)?))?;
if time == 0 {
log::trace!("PyroscopeAgent - Session Killed");
let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();
return Ok(());
}
}
Ok(())
}));
Ok(())
}
pub fn start(&mut self) {
match self._start() {
Ok(_) => log::trace!("PyroscopeAgent - Agent started"),
Err(_) => log::error!("PyroscopeAgent - Error starting agent"),
}
}
fn _stop(&mut self) -> Result<()> {
log::debug!("PyroscopeAgent - Stopping");
self.tx.take().unwrap().send(0)?;
let pair = Arc::clone(&self.running);
let (lock, cvar) = &*pair;
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;
let backend = Arc::clone(&self.backend);
backend.lock()?.stop()?;
Ok(())
}
pub fn stop(&mut self) {
match self._stop() {
Ok(_) => log::trace!("PyroscopeAgent - Agent stopped"),
Err(_) => log::error!("PyroscopeAgent - Error stopping agent"),
}
}
pub fn add_tags(&mut self, tags: &[(&str, &str)]) -> Result<()> {
log::debug!("PyroscopeAgent - Adding tags");
if tags.is_empty() {
return Ok(());
}
self.stop();
let tags_hashmap: HashMap<String, String> = tags
.to_owned()
.iter()
.cloned()
.map(|(a, b)| (a.to_owned(), b.to_owned()))
.collect();
self.config.tags.extend(tags_hashmap);
self.start();
Ok(())
}
pub fn remove_tags(&mut self, tags: &[&str]) -> Result<()> {
log::debug!("PyroscopeAgent - Removing tags");
if tags.is_empty() {
return Ok(());
}
self.stop();
tags.iter().for_each(|key| {
self.config.tags.remove(key.to_owned());
});
self.start();
Ok(())
}
}