use std::{
collections::HashMap,
marker::PhantomData,
sync::{
mpsc::{self, Sender},
Arc, Condvar, Mutex,
},
thread::JoinHandle,
};
use crate::{
backend::{BackendReady, BackendUninitialized, Report, Tag},
error::Result,
session::{Session, SessionManager, SessionSignal},
timer::{Timer, TimerSignal},
utils::get_time_range,
PyroscopeError,
};
use crate::backend::{BackendImpl, ThreadTag};
const LOG_TAG: &str = "Pyroscope::Agent";
const PPROFRS_SPY_NAME: &str = "pyroscope-rs";
const PPROFRS_SPY_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Clone, Debug)]
pub struct PyroscopeConfig {
pub url: String,
pub application_name: String,
pub tags: HashMap<String, String>,
pub sample_rate: u32,
pub spy_name: String,
pub spy_version: String,
pub basic_auth: Option<BasicAuth>,
pub func: Option<fn(Report) -> Report>,
pub tenant_id: Option<String>,
pub http_headers: HashMap<String, String>,
}
#[derive(Clone, Debug)]
pub struct BasicAuth {
pub username: String,
pub password: String,
}
impl Default for PyroscopeConfig {
fn default() -> Self {
Self {
url: "http://localhost:4040".to_string(),
application_name: names::Generator::default()
.next()
.unwrap_or_else(|| "unassigned.app".to_string())
.replace('-', "."),
tags: HashMap::new(),
sample_rate: 100u32,
spy_name: PPROFRS_SPY_NAME.to_string(),
spy_version: PPROFRS_SPY_VERSION.to_string(),
basic_auth: None,
func: None,
tenant_id: None,
http_headers: HashMap::new(),
}
}
}
impl PyroscopeConfig {
pub fn new(
url: impl AsRef<str>,
application_name: impl AsRef<str>,
sample_rate: u32,
spy_name: impl AsRef<str>,
spy_version: impl AsRef<str>,
) -> Self {
Self {
url: url.as_ref().to_owned(),
application_name: application_name.as_ref().to_owned(),
tags: HashMap::new(),
sample_rate,
spy_name: spy_name.as_ref().to_owned(),
spy_version: spy_version.as_ref().to_owned(),
basic_auth: None,
func: None,
tenant_id: None,
http_headers: HashMap::new(),
}
}
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
url: url.as_ref().to_owned(),
..self
}
}
pub fn basic_auth(self, username: String, password: String) -> Self {
Self {
basic_auth: Some(BasicAuth { username, password }),
..self
}
}
pub fn func(self, func: fn(Report) -> Report) -> Self {
Self {
func: Some(func),
..self
}
}
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
let tags_hashmap: HashMap<String, String> = tags
.to_owned()
.iter()
.cloned()
.map(|(a, b)| (a.to_owned(), b.to_owned()))
.collect();
Self {
tags: tags_hashmap,
..self
}
}
pub fn tenant_id(self, tenant_id: String) -> Self {
Self {
tenant_id: Some(tenant_id),
..self
}
}
pub fn http_headers(self, http_headers: HashMap<String, String>) -> Self {
Self {
http_headers,
..self
}
}
}
pub struct PyroscopeAgentBuilder {
backend: BackendImpl<BackendUninitialized>,
config: PyroscopeConfig,
}
impl PyroscopeAgentBuilder {
pub fn new(
url: impl AsRef<str>,
application_name: impl AsRef<str>,
sample_rate: u32,
spy_name: impl AsRef<str>,
spy_version: impl AsRef<str>,
backend: BackendImpl<BackendUninitialized>,
) -> Self {
Self {
backend,
config: PyroscopeConfig::new(url, application_name, sample_rate, spy_name, spy_version),
}
}
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
config: self.config.url(url),
..self
}
}
pub fn basic_auth(self, username: impl AsRef<str>, password: impl AsRef<str>) -> Self {
Self {
config: self
.config
.basic_auth(username.as_ref().to_owned(), password.as_ref().to_owned()),
..self
}
}
pub fn func(self, func: fn(Report) -> Report) -> Self {
Self {
config: self.config.func(func),
..self
}
}
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
Self {
config: self.config.tags(tags),
..self
}
}
pub fn tenant_id(self, tenant_id: String) -> Self {
Self {
config: self.config.tenant_id(tenant_id),
..self
}
}
pub fn http_headers(self, http_headers: HashMap<String, String>) -> Self {
Self {
config: self.config.http_headers(http_headers),
..self
}
}
pub fn build(self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
let config = self.config;
let backend_ready = self.backend.initialize()?;
log::trace!(target: LOG_TAG, "Backend initialized");
let timer = Timer::initialize(std::time::Duration::from_secs(10))?;
log::trace!(target: LOG_TAG, "Timer initialized");
let session_manager = SessionManager::new()?;
log::trace!(target: LOG_TAG, "SessionManager initialized");
Ok(PyroscopeAgent {
backend: backend_ready,
config,
timer,
session_manager,
tx: None,
handle: None,
running: Arc::new((
#[allow(clippy::mutex_atomic)]
Mutex::new(false),
Condvar::new(),
)),
_state: PhantomData,
})
}
}
pub trait PyroscopeAgentState {}
#[derive(Debug)]
pub struct PyroscopeAgentBare;
#[derive(Debug)]
pub struct PyroscopeAgentReady;
#[derive(Debug)]
pub struct PyroscopeAgentRunning;
impl PyroscopeAgentState for PyroscopeAgentBare {}
impl PyroscopeAgentState for PyroscopeAgentReady {}
impl PyroscopeAgentState for PyroscopeAgentRunning {}
pub struct PyroscopeAgent<S: PyroscopeAgentState> {
timer: Timer,
session_manager: SessionManager,
tx: Option<Sender<TimerSignal>>,
handle: Option<JoinHandle<Result<()>>>,
running: Arc<(Mutex<bool>, Condvar)>,
pub backend: BackendImpl<BackendReady>,
pub config: PyroscopeConfig,
_state: PhantomData<S>,
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
fn transition<D: PyroscopeAgentState>(self) -> PyroscopeAgent<D> {
PyroscopeAgent {
timer: self.timer,
session_manager: self.session_manager,
tx: self.tx,
handle: self.handle,
running: self.running,
backend: self.backend,
config: self.config,
_state: PhantomData,
}
}
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
pub fn shutdown(mut self) {
log::debug!(target: LOG_TAG, "PyroscopeAgent::drop()");
match self.backend.shutdown() {
Ok(_) => log::debug!(target: LOG_TAG, "Backend shutdown"),
Err(e) => log::error!(target: LOG_TAG, "Backend shutdown error: {}", e),
}
match self.timer.drop_listeners() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer listeners"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer listeners"),
}
if let Some(handle) = self.timer.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer thread"),
}
}
match self.session_manager.push(SessionSignal::Kill) {
Ok(_) => log::trace!(target: LOG_TAG, "Sent kill signal to SessionManager"),
Err(_) => log::error!(
target: LOG_TAG,
"Error sending kill signal to SessionManager"
),
}
if let Some(handle) = self.session_manager.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped SessionManager thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping SessionManager thread"),
}
}
if let Some(handle) = self.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped main thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping main thread"),
}
}
log::debug!(target: LOG_TAG, "Agent Shutdown");
}
}
impl PyroscopeAgent<PyroscopeAgentReady> {
pub fn start(mut self) -> Result<PyroscopeAgent<PyroscopeAgentRunning>> {
log::debug!(target: LOG_TAG, "Starting");
let backend = Arc::clone(&self.backend.backend);
let pair = Arc::clone(&self.running);
let (lock, _cvar) = &*pair;
let mut running = lock.lock()?;
*running = true;
drop(running);
let (tx, rx) = mpsc::channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx);
let config = self.config.clone();
let stx = self.session_manager.tx.clone();
self.handle = Some(std::thread::spawn(move || {
log::trace!(target: LOG_TAG, "Main Thread started");
while let Ok(signal) = rx.recv() {
match signal {
TimerSignal::NextSnapshot(until) => {
log::trace!(target: LOG_TAG, "Sending session {}", until);
let report = backend
.lock()?
.as_mut()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.report()?;
stx.send(SessionSignal::Session(Box::new(Session::new(
until,
config.clone(),
report,
)?)))?
}
TimerSignal::Terminate => {
log::trace!(target: LOG_TAG, "Session Killed");
let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();
return Ok(());
}
}
}
Ok(())
}));
Ok(self.transition())
}
}
impl PyroscopeAgent<PyroscopeAgentRunning> {
pub fn stop(mut self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
log::debug!(target: LOG_TAG, "Stopping");
if let Some(sender) = self.tx.take() {
sender.send(TimerSignal::NextSnapshot(get_time_range(0)?.until))?;
sender.send(TimerSignal::Terminate)?;
} else {
log::error!("PyroscopeAgent - Missing sender")
}
let pair = Arc::clone(&self.running);
let (lock, cvar) = &*pair;
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;
Ok(self.transition())
}
#[allow(clippy::type_complexity)]
pub fn tag_wrapper(
&self,
) -> (
impl Fn(String, String) -> Result<()>,
impl Fn(String, String) -> Result<()>,
) {
let backend_add = self.backend.backend.clone();
let backend_remove = self.backend.backend.clone();
(
move |key, value| {
let thread_id = crate::utils::ThreadId::pthread_self();
let rule = ThreadTag::new(thread_id, Tag::new(key, value));
let backend = backend_add.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.add_tag(rule)?;
Ok(())
},
move |key, value| {
let thread_id = crate::utils::ThreadId::pthread_self();
let rule = ThreadTag::new(thread_id, Tag::new(key, value));
let backend = backend_remove.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.remove_tag(rule)?;
Ok(())
},
)
}
pub fn add_thread_tag(&self, thread_id: crate::utils::ThreadId, tag: Tag) -> Result<()> {
let rule = ThreadTag::new(thread_id, tag);
self.backend.add_tag(rule)?;
Ok(())
}
pub fn remove_thread_tag(&self, thread_id: crate::utils::ThreadId, tag: Tag) -> Result<()> {
let rule = ThreadTag::new(thread_id, tag);
self.backend.remove_tag(rule)?;
Ok(())
}
}
pub fn parse_http_headers_json(http_headers_json: String) -> Result<HashMap<String, String>> {
let mut http_headers = HashMap::new();
let parsed: serde_json::Value = serde_json::from_str(&http_headers_json)?;
let parsed = parsed
.as_object()
.ok_or_else(|| PyroscopeError::AdHoc(format!("expected object, got {}", parsed)))?;
for (k, v) in parsed {
if let Some(value) = v.as_str() {
http_headers.insert(k.to_string(), value.to_string());
} else {
return Err(PyroscopeError::AdHoc(format!(
"invalid http header value, not a string: {}",
v
)));
}
}
Ok(http_headers)
}
pub fn parse_vec_string_json(s: String) -> Result<Vec<String>> {
let parsed: serde_json::Value = serde_json::from_str(&s)?;
let parsed = parsed
.as_array()
.ok_or_else(|| PyroscopeError::AdHoc(format!("expected array, got {}", parsed)))?;
let mut res = Vec::with_capacity(parsed.len());
for v in parsed {
if let Some(s) = v.as_str() {
res.push(s.to_string());
} else {
return Err(PyroscopeError::AdHoc(format!(
"invalid element value, not a string: {}",
v
)));
}
}
Ok(res)
}