use std::{
collections::HashMap,
marker::PhantomData,
str::FromStr,
sync::{
mpsc::{self, Sender},
Arc, Condvar, Mutex,
},
thread::JoinHandle,
};
use crate::{
backend::{void_backend, BackendReady, BackendUninitialized, Report, Rule, Tag, VoidConfig},
error::Result,
session::{Session, SessionManager, SessionSignal},
timer::{Timer, TimerSignal},
utils::get_time_range,
PyroscopeError,
};
use json;
use crate::backend::BackendImpl;
use crate::pyroscope::Compression::GZIP;
use crate::pyroscope::ReportEncoding::{PPROF};
const LOG_TAG: &str = "Pyroscope::Agent";
#[derive(Clone, Debug)]
pub struct PyroscopeConfig {
pub url: String,
pub application_name: String,
pub tags: HashMap<String, String>,
pub sample_rate: u32,
pub spy_name: String,
pub auth_token: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub func: Option<fn(Report) -> Report>,
pub compression: Option<Compression>,
pub report_encoding: ReportEncoding,
pub tenant_id: Option<String>,
pub http_headers: HashMap<String, String>,
}
#[derive(Clone, Debug)]
pub struct BasicAuth {
pub username: String,
pub password: String,
}
impl Default for PyroscopeConfig {
fn default() -> Self {
Self {
url: "http://localhost:4040".to_string(),
application_name: names::Generator::default()
.next()
.unwrap_or_else(|| "unassigned.app".to_string())
.replace('-', "."),
tags: HashMap::new(),
sample_rate: 100u32,
spy_name: "undefined".to_string(),
auth_token: None,
basic_auth: None,
func: None,
compression: None,
report_encoding: ReportEncoding::FOLDED,
tenant_id: None,
http_headers: HashMap::new(),
}
}
}
impl PyroscopeConfig {
pub fn new(url: impl AsRef<str>, application_name: impl AsRef<str>) -> Self {
Self {
url: url.as_ref().to_owned(), application_name: application_name.as_ref().to_owned(), tags: HashMap::new(), sample_rate: 100u32, spy_name: String::from("undefined"), auth_token: None, basic_auth: None,
func: None, compression: None,
report_encoding: ReportEncoding::FOLDED,
tenant_id: None,
http_headers: HashMap::new(),
}
}
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
url: url.as_ref().to_owned(),
..self
}
}
pub fn application_name(self, application_name: impl AsRef<str>) -> Self {
Self {
application_name: application_name.as_ref().to_owned(),
..self
}
}
pub fn sample_rate(self, sample_rate: u32) -> Self {
Self {
sample_rate,
..self
}
}
pub fn spy_name(self, spy_name: String) -> Self {
Self { spy_name, ..self }
}
pub fn auth_token(self, auth_token: String) -> Self {
Self {
auth_token: Some(auth_token),
..self
}
}
pub fn basic_auth(self, username: String, password: String) -> Self {
Self {
basic_auth: Some(BasicAuth { username, password }),
..self
}
}
pub fn func(self, func: fn(Report) -> Report) -> Self {
Self {
func: Some(func),
..self
}
}
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
let tags_hashmap: HashMap<String, String> = tags
.to_owned()
.iter()
.cloned()
.map(|(a, b)| (a.to_owned(), b.to_owned()))
.collect();
Self {
tags: tags_hashmap,
..self
}
}
pub fn compression(self, compression: Compression) -> Self {
Self {
compression: Some(compression),
..self
}
}
pub fn report_encoding(self, report_encoding: ReportEncoding) -> Self {
Self {
report_encoding: report_encoding,
..self
}
}
pub fn tenant_id(self, tenant_id: String) -> Self {
Self {
tenant_id: Some(tenant_id),
..self
}
}
pub fn http_headers(self, http_headers: HashMap<String, String>) -> Self {
Self {
http_headers: http_headers,
..self
}
}
}
pub struct PyroscopeAgentBuilder {
backend: BackendImpl<BackendUninitialized>,
config: PyroscopeConfig,
}
impl Default for PyroscopeAgentBuilder {
fn default() -> Self {
Self {
backend: void_backend(VoidConfig::default()),
config: PyroscopeConfig::default(),
}
}
}
impl PyroscopeAgentBuilder {
pub fn new(url: impl AsRef<str>, application_name: impl AsRef<str>) -> Self {
Self {
backend: void_backend(VoidConfig::default()), config: PyroscopeConfig::new(url, application_name),
}
}
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
config: self.config.url(url),
..self
}
}
pub fn application_name(self, application_name: impl AsRef<str>) -> Self {
Self {
config: self.config.application_name(application_name),
..self
}
}
pub fn backend(self, backend: BackendImpl<BackendUninitialized>) -> Self {
Self { backend, ..self }
}
pub fn auth_token(self, auth_token: impl AsRef<str>) -> Self {
Self {
config: self.config.auth_token(auth_token.as_ref().to_owned()),
..self
}
}
pub fn basic_auth(self, username: impl AsRef<str>, password: impl AsRef<str>) -> Self {
Self {
config: self.config.basic_auth(username.as_ref().to_owned(), password.as_ref().to_owned()),
..self
}
}
pub fn func(self, func: fn(Report) -> Report) -> Self {
Self {
config: self.config.func(func),
..self
}
}
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
Self {
config: self.config.tags(tags),
..self
}
}
pub fn compression(self, compression: Compression) -> Self {
Self {
config: self.config.compression(compression),
..self
}
}
pub fn report_encoding(self, report_encoding: ReportEncoding) -> Self {
Self {
config: self.config.report_encoding(report_encoding),
..self
}
}
pub fn tenant_id(self, tenant_id: String) -> Self {
Self {
config: self.config.tenant_id(tenant_id),
..self
}
}
pub fn http_headers(self, http_headers: HashMap<String, String>) -> Self {
Self {
config: self.config.http_headers(http_headers),
..self
}
}
pub fn build(self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
let config = self.config.sample_rate(self.backend.sample_rate()?);
let config = config.spy_name(self.backend.spy_name()?);
let config = match self.backend.spy_extension()? {
Some(extension) => {
if config.report_encoding == PPROF {
config
} else {
let application_name = config.application_name.clone();
config.application_name(format!("{}.{}", application_name, extension))
}
}
None => config,
};
for (key, value) in config.tags.iter() {
self.backend
.add_rule(crate::backend::Rule::GlobalTag(Tag::new(
key.to_owned(),
value.to_owned(),
)))?;
}
let backend_ready = self.backend.initialize()?;
log::trace!(target: LOG_TAG, "Backend initialized");
let timer = Timer::initialize(std::time::Duration::from_secs(10))?;
log::trace!(target: LOG_TAG, "Timer initialized");
let session_manager = SessionManager::new()?;
log::trace!(target: LOG_TAG, "SessionManager initialized");
Ok(PyroscopeAgent {
backend: backend_ready,
config,
timer,
session_manager,
tx: None,
handle: None,
running: Arc::new((
#[allow(clippy::mutex_atomic)]
Mutex::new(false),
Condvar::new(),
)),
_state: PhantomData,
})
}
}
#[derive(Clone, Debug)]
pub enum Compression {
GZIP
}
impl FromStr for Compression {
type Err = ();
fn from_str(input: &str) -> std::result::Result<Compression, Self::Err> {
match input {
"gzip" => Ok(GZIP),
_ => Err(()),
}
}
}
#[derive(Clone, PartialEq, Debug)]
pub enum ReportEncoding {
FOLDED,
PPROF,
}
impl FromStr for ReportEncoding {
type Err = ();
fn from_str(input: &str) -> std::result::Result<ReportEncoding, Self::Err> {
match input {
"collapsed" => Ok(ReportEncoding::FOLDED),
"folded" => Ok(ReportEncoding::FOLDED),
"pprof" => Ok(ReportEncoding::PPROF),
_ => Err(()),
}
}
}
pub trait PyroscopeAgentState {}
#[derive(Debug)]
pub struct PyroscopeAgentBare;
#[derive(Debug)]
pub struct PyroscopeAgentReady;
#[derive(Debug)]
pub struct PyroscopeAgentRunning;
impl PyroscopeAgentState for PyroscopeAgentBare {}
impl PyroscopeAgentState for PyroscopeAgentReady {}
impl PyroscopeAgentState for PyroscopeAgentRunning {}
#[derive(Debug)]
pub struct PyroscopeAgent<S: PyroscopeAgentState> {
timer: Timer,
session_manager: SessionManager,
tx: Option<Sender<TimerSignal>>,
handle: Option<JoinHandle<Result<()>>>,
running: Arc<(Mutex<bool>, Condvar)>,
pub backend: BackendImpl<BackendReady>,
pub config: PyroscopeConfig,
_state: PhantomData<S>,
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
fn transition<D: PyroscopeAgentState>(self) -> PyroscopeAgent<D> {
PyroscopeAgent {
timer: self.timer,
session_manager: self.session_manager,
tx: self.tx,
handle: self.handle,
running: self.running,
backend: self.backend,
config: self.config,
_state: PhantomData,
}
}
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
pub fn shutdown(mut self) {
log::debug!(target: LOG_TAG, "PyroscopeAgent::drop()");
match self.backend.shutdown() {
Ok(_) => log::debug!(target: LOG_TAG, "Backend shutdown"),
Err(e) => log::error!(target: LOG_TAG, "Backend shutdown error: {}", e),
}
match self.timer.drop_listeners() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer listeners"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer listeners"),
}
if let Some(handle) = self.timer.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer thread"),
}
}
match self.session_manager.push(SessionSignal::Kill) {
Ok(_) => log::trace!(target: LOG_TAG, "Sent kill signal to SessionManager"),
Err(_) => log::error!(
target: LOG_TAG,
"Error sending kill signal to SessionManager"
),
}
if let Some(handle) = self.session_manager.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped SessionManager thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping SessionManager thread"),
}
}
if let Some(handle) = self.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped main thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping main thread"),
}
}
log::debug!(target: LOG_TAG, "Agent Shutdown");
}
}
impl PyroscopeAgent<PyroscopeAgentBare> {
pub fn builder<S: AsRef<str>>(url: S, application_name: S) -> PyroscopeAgentBuilder {
PyroscopeAgentBuilder::new(url, application_name)
}
pub fn default_builder() -> PyroscopeAgentBuilder {
PyroscopeAgentBuilder::default()
}
}
impl PyroscopeAgent<PyroscopeAgentReady> {
pub fn start(mut self) -> Result<PyroscopeAgent<PyroscopeAgentRunning>> {
log::debug!(target: LOG_TAG, "Starting");
let backend = Arc::clone(&self.backend.backend);
let pair = Arc::clone(&self.running);
let (lock, _cvar) = &*pair;
let mut running = lock.lock()?;
*running = true;
drop(running);
let (tx, rx) = mpsc::channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx);
let config = self.config.clone();
let stx = self.session_manager.tx.clone();
self.handle = Some(std::thread::spawn(move || {
log::trace!(target: LOG_TAG, "Main Thread started");
while let Ok(signal) = rx.recv() {
match signal {
TimerSignal::NextSnapshot(until) => {
log::trace!(target: LOG_TAG, "Sending session {}", until);
let report = backend
.lock()?
.as_mut()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.report()?;
stx.send(SessionSignal::Session(Session::new(
until,
config.clone(),
report,
)?))?
}
TimerSignal::Terminate => {
log::trace!(target: LOG_TAG, "Session Killed");
let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();
return Ok(());
}
}
}
Ok(())
}));
Ok(self.transition())
}
}
impl PyroscopeAgent<PyroscopeAgentRunning> {
pub fn stop(mut self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
log::debug!(target: LOG_TAG, "Stopping");
if let Some(sender) = self.tx.take() {
sender.send(TimerSignal::NextSnapshot(get_time_range(0)?.until))?;
sender.send(TimerSignal::Terminate)?;
} else {
log::error!("PyroscopeAgent - Missing sender")
}
let pair = Arc::clone(&self.running);
let (lock, cvar) = &*pair;
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;
Ok(self.transition())
}
pub fn tag_wrapper(
&self,
) -> (
impl Fn(String, String) -> Result<()>,
impl Fn(String, String) -> Result<()>,
) {
let backend_add = self.backend.backend.clone();
let backend_remove = self.backend.backend.clone();
(
move |key, value| {
let thread_id = crate::utils::pthread_self()?;
let rule = Rule::ThreadTag(thread_id, Tag::new(key, value));
let backend = backend_add.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.add_rule(rule)?;
Ok(())
},
move |key, value| {
let thread_id = crate::utils::pthread_self()?;
let rule = Rule::ThreadTag(thread_id, Tag::new(key, value));
let backend = backend_remove.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.remove_rule(rule)?;
Ok(())
},
)
}
pub fn add_global_tag(&self, tag: Tag) -> Result<()> {
let rule = Rule::GlobalTag(tag);
self.backend.add_rule(rule)?;
Ok(())
}
pub fn remove_global_tag(&self, tag: Tag) -> Result<()> {
let rule = Rule::GlobalTag(tag);
self.backend.remove_rule(rule)?;
Ok(())
}
pub fn add_thread_tag(&self, thread_id: u64, tag: Tag) -> Result<()> {
let rule = Rule::ThreadTag(thread_id, tag);
self.backend.add_rule(rule)?;
Ok(())
}
pub fn remove_thread_tag(&self, thread_id: u64, tag: Tag) -> Result<()> {
let rule = Rule::ThreadTag(thread_id, tag);
self.backend.remove_rule(rule)?;
Ok(())
}
}
pub fn parse_http_headers_json(http_headers_json: String) -> Result<HashMap<String, String>> {
let mut http_headers = HashMap::new();
let parsed = json::parse(&http_headers_json)?;
if !parsed.is_object() {
return Err(PyroscopeError::AdHoc(format!("expected object, got {}", parsed)));
}
for (k, v) in parsed.entries() {
if v.is_string() {
http_headers.insert(k.to_string(), v.to_string());
} else {
return Err(PyroscopeError::AdHoc(format!("invalid http header value, not a string: {}", v.to_string())));
}
};
return Ok(http_headers);
}