use std::{
collections::HashMap,
marker::PhantomData,
sync::{
mpsc::{self, Sender},
Arc, Condvar, Mutex,
},
thread::JoinHandle,
};
use crate::{
backend::{void_backend, BackendReady, BackendUninitialized, Rule, Tag, VoidConfig},
error::Result,
session::{Session, SessionManager, SessionSignal},
timer::{Timer, TimerSignal},
utils::get_time_range,
PyroscopeError,
};
use crate::backend::BackendImpl;
const LOG_TAG: &str = "Pyroscope::Agent";
/// Pyroscope Agent Configuration. This is the configuration that is passed to the agent.
///
/// # Example
/// ```
/// use pyroscope::pyroscope::PyroscopeConfig;
/// let config = PyroscopeConfig::new("http://localhost:8080", "my-app");
/// ```
#[derive(Clone, Debug)]
pub struct PyroscopeConfig {
/// Pyroscope Server Address
pub url: String,
/// Application Name
pub application_name: String,
/// Tags
pub tags: HashMap<String, String>,
/// Sample Rate
pub sample_rate: u32,
/// Spy Name
pub spy_name: String,
/// Authentication Token
pub auth_token: Option<String>,
}
impl Default for PyroscopeConfig {
fn default() -> Self {
Self {
url: "http://localhost:4040".to_string(),
application_name: names::Generator::default()
.next()
.unwrap_or_else(|| "unassigned.app".to_string())
.replace('-', "."),
tags: HashMap::new(),
sample_rate: 100u32,
spy_name: "undefined".to_string(),
auth_token: None,
}
}
}
impl PyroscopeConfig {
/// Create a new PyroscopeConfig object. url and application_name are required.
/// tags and sample_rate are optional. If sample_rate is not specified, it will default to 100.
///
/// # Example
/// ```ignore
/// let config = PyroscopeConfig::new("http://localhost:8080", "my-app");
/// ```
pub fn new(url: impl AsRef<str>, application_name: impl AsRef<str>) -> Self {
Self {
url: url.as_ref().to_owned(), // Pyroscope Server URL
application_name: application_name.as_ref().to_owned(), // Application Name
tags: HashMap::new(), // Empty tags
sample_rate: 100u32, // Default sample rate
spy_name: String::from("undefined"), // Spy Name should be set by the backend
auth_token: None, // No authentication token
}
}
// Set the Pyroscope Server URL
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
url: url.as_ref().to_owned(),
..self
}
}
// Set the Application Name
pub fn application_name(self, application_name: impl AsRef<str>) -> Self {
Self {
application_name: application_name.as_ref().to_owned(),
..self
}
}
/// Set the Sample rate.
pub fn sample_rate(self, sample_rate: u32) -> Self {
Self {
sample_rate,
..self
}
}
/// Set the Spy Name.
pub fn spy_name(self, spy_name: String) -> Self {
Self { spy_name, ..self }
}
/// Set the Authentication Token.
pub fn auth_token(self, auth_token: String) -> Self {
Self {
auth_token: Some(auth_token),
..self
}
}
/// Set the tags.
///
/// # Example
/// ```ignore
/// use pyroscope::pyroscope::PyroscopeConfig;
/// let config = PyroscopeConfig::new("http://localhost:8080", "my-app")
/// .tags(vec![("env", "dev")])?;
/// ```
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
// Convert &[(&str, &str)] to HashMap(String, String)
let tags_hashmap: HashMap<String, String> = tags
.to_owned()
.iter()
.cloned()
.map(|(a, b)| (a.to_owned(), b.to_owned()))
.collect();
Self {
tags: tags_hashmap,
..self
}
}
}
/// PyroscopeAgent Builder
///
/// Alternatively, you can use PyroscopeAgent::build() which is a short-hand
/// for calling PyroscopeAgentBuilder::new()
///
/// # Example
/// ```ignore
/// use pyroscope::pyroscope::PyroscopeAgentBuilder;
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app");
/// let agent = builder.build()?;
/// ```
pub struct PyroscopeAgentBuilder {
/// Profiler backend
backend: BackendImpl<BackendUninitialized>,
/// Configuration Object
config: PyroscopeConfig,
}
impl Default for PyroscopeAgentBuilder {
fn default() -> Self {
Self {
backend: void_backend(VoidConfig::default()),
config: PyroscopeConfig::default(),
}
}
}
impl PyroscopeAgentBuilder {
/// Create a new PyroscopeAgentBuilder object. url and application_name are required.
/// tags and sample_rate are optional.
///
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app");
/// ```
pub fn new(url: impl AsRef<str>, application_name: impl AsRef<str>) -> Self {
Self {
backend: void_backend(VoidConfig::default()), // Default Backend
config: PyroscopeConfig::new(url, application_name),
}
}
/// Set the Pyroscope Server URL. This can be used if the Builder was initialized with the default
/// trait. Default is "http://localhost:4040".
///
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::default()
/// .url("http://localhost:8080")
/// .build()?;
/// ```
pub fn url(self, url: impl AsRef<str>) -> Self {
Self {
config: self.config.url(url),
..self
}
}
/// Set the Application Name. This can be used if the Builder was initialized with the default
/// trait. Default is a randomly generated name.
///
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::default()
/// .application_name("my-app")
/// .build()?;
/// ```
pub fn application_name(self, application_name: impl AsRef<str>) -> Self {
Self {
config: self.config.application_name(application_name),
..self
}
}
/// Set the agent backend. Default is void-backend.
///
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
/// .backend(PprofConfig::new().sample_rate(100))
/// .build()?;
/// ```
pub fn backend(self, backend: BackendImpl<BackendUninitialized>) -> Self {
Self { backend, ..self }
}
/// Set JWT authentication token.
/// This is optional. If not set, the agent will not send any authentication token.
///
/// #Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
/// .auth_token("my-token")
/// .build()
/// ?;
/// ```
pub fn auth_token(self, auth_token: impl AsRef<str>) -> Self {
Self {
config: self.config.auth_token(auth_token.as_ref().to_owned()),
..self
}
}
/// Set tags. Default is empty.
///
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
/// .tags(vec![("env", "dev")])
/// .build()?;
/// ```
pub fn tags(self, tags: Vec<(&str, &str)>) -> Self {
Self {
config: self.config.tags(tags),
..self
}
}
/// Initialize the backend, timer and return a PyroscopeAgent with Ready
/// state. While you can call this method, you should call it through the
/// `PyroscopeAgent.build()` method.
pub fn build(self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
// Set Spy Name, Spy Extension and Sample Rate from the Backend
let config = self.config.sample_rate(self.backend.sample_rate()?);
let config = config.spy_name(self.backend.spy_name()?);
// use match instead of if let to avoid the need to borrow
let config = match self.backend.spy_extension()? {
Some(extension) => {
let application_name = config.application_name.clone();
config.application_name(format!("{}.{}", application_name, extension))
}
None => config,
};
// Set Global Tags
for (key, value) in config.tags.iter() {
self.backend
.add_rule(crate::backend::Rule::GlobalTag(Tag::new(
key.to_owned(),
value.to_owned(),
)))?;
}
// Initialize the Backend
let backend_ready = self.backend.initialize()?;
log::trace!(target: LOG_TAG, "Backend initialized");
// Start the Timer
let timer = Timer::initialize(std::time::Duration::from_secs(10))?;
log::trace!(target: LOG_TAG, "Timer initialized");
// Start the SessionManager
let session_manager = SessionManager::new()?;
log::trace!(target: LOG_TAG, "SessionManager initialized");
// Return PyroscopeAgent
Ok(PyroscopeAgent {
backend: backend_ready,
config,
timer,
session_manager,
tx: None,
handle: None,
running: Arc::new((
#[allow(clippy::mutex_atomic)]
Mutex::new(false),
Condvar::new(),
)),
_state: PhantomData,
})
}
}
/// This trait is used to encode the state of the agent.
pub trait PyroscopeAgentState {}
/// Marker struct for an Uninitialized state.
#[derive(Debug)]
pub struct PyroscopeAgentBare;
/// Marker struct for a Ready state.
#[derive(Debug)]
pub struct PyroscopeAgentReady;
/// Marker struct for a Running state.
#[derive(Debug)]
pub struct PyroscopeAgentRunning;
impl PyroscopeAgentState for PyroscopeAgentBare {}
impl PyroscopeAgentState for PyroscopeAgentReady {}
impl PyroscopeAgentState for PyroscopeAgentRunning {}
/// PyroscopeAgent is the main object of the library. It is used to start and stop the profiler, schedule the timer, and send the profiler data to the server.
#[derive(Debug)]
pub struct PyroscopeAgent<S: PyroscopeAgentState> {
/// Instance of the Timer
timer: Timer,
/// Instance of the SessionManager
session_manager: SessionManager,
/// Channel sender for the timer thread
tx: Option<Sender<TimerSignal>>,
/// Handle to the thread that runs the Pyroscope Agent
handle: Option<JoinHandle<Result<()>>>,
/// A structure to signal thread termination
running: Arc<(Mutex<bool>, Condvar)>,
/// Profiler backend
pub backend: BackendImpl<BackendReady>,
/// Configuration Object
pub config: PyroscopeConfig,
/// PyroscopeAgent State
_state: PhantomData<S>,
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
/// Transition the PyroscopeAgent to a new state.
fn transition<D: PyroscopeAgentState>(self) -> PyroscopeAgent<D> {
PyroscopeAgent {
timer: self.timer,
session_manager: self.session_manager,
tx: self.tx,
handle: self.handle,
running: self.running,
backend: self.backend,
config: self.config,
_state: PhantomData,
}
}
}
impl<S: PyroscopeAgentState> PyroscopeAgent<S> {
/// Properly shutdown the agent.
pub fn shutdown(mut self) {
log::debug!(target: LOG_TAG, "PyroscopeAgent::drop()");
// Shutdown Backend
match self.backend.shutdown() {
Ok(_) => log::debug!(target: LOG_TAG, "Backend shutdown"),
Err(e) => log::error!(target: LOG_TAG, "Backend shutdown error: {}", e),
}
// Drop Timer listeners
match self.timer.drop_listeners() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer listeners"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer listeners"),
}
// Wait for the Timer thread to finish
if let Some(handle) = self.timer.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped timer thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping timer thread"),
}
}
// Stop the SessionManager
match self.session_manager.push(SessionSignal::Kill) {
Ok(_) => log::trace!(target: LOG_TAG, "Sent kill signal to SessionManager"),
Err(_) => log::error!(
target: LOG_TAG,
"Error sending kill signal to SessionManager"
),
}
if let Some(handle) = self.session_manager.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped SessionManager thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping SessionManager thread"),
}
}
// Wait for main thread to finish
if let Some(handle) = self.handle.take() {
match handle.join() {
Ok(_) => log::trace!(target: LOG_TAG, "Dropped main thread"),
Err(_) => log::error!(target: LOG_TAG, "Error Dropping main thread"),
}
}
log::debug!(target: LOG_TAG, "Agent Shutdown");
}
}
impl PyroscopeAgent<PyroscopeAgentBare> {
/// Short-hand for PyroscopeAgentBuilder::build(). This is a convenience method.
///
/// # Example
/// ```ignore
/// let agent = PyroscopeAgent::builder("http://localhost:8080", "my-app").build()?;
/// ```
pub fn builder<S: AsRef<str>>(url: S, application_name: S) -> PyroscopeAgentBuilder {
// Build PyroscopeAgent
PyroscopeAgentBuilder::new(url, application_name)
}
}
impl PyroscopeAgent<PyroscopeAgentReady> {
/// Start profiling and sending data. The agent will keep running until stopped. The agent will send data to the server every 10s seconds.
///
/// # Example
/// ```ignore
/// let agent = PyroscopeAgent::builder("http://localhost:8080", "my-app").build()?;
/// let agent_running = agent.start()?;
/// ```
pub fn start(mut self) -> Result<PyroscopeAgent<PyroscopeAgentRunning>> {
log::debug!(target: LOG_TAG, "Starting");
// Create a clone of Backend
let backend = Arc::clone(&self.backend.backend);
// Call start()
// set running to true
let pair = Arc::clone(&self.running);
let (lock, _cvar) = &*pair;
let mut running = lock.lock()?;
*running = true;
drop(running);
// Create a channel to listen for timer signals
let (tx, rx) = mpsc::channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx);
let config = self.config.clone();
// Clone SessionManager Sender
let stx = self.session_manager.tx.clone();
self.handle = Some(std::thread::spawn(move || {
log::trace!(target: LOG_TAG, "Main Thread started");
while let Ok(signal) = rx.recv() {
match signal {
TimerSignal::NextSnapshot(until) => {
log::trace!(target: LOG_TAG, "Sending session {}", until);
// Generate report from backend
let report = backend
.lock()?
.as_mut()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.report()?;
// Send new Session to SessionManager
stx.send(SessionSignal::Session(Session::new(
until,
config.clone(),
report,
)?))?
}
TimerSignal::Terminate => {
log::trace!(target: LOG_TAG, "Session Killed");
// Notify the Stop function
let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();
// Kill the internal thread
return Ok(());
}
}
}
Ok(())
}));
Ok(self.transition())
}
}
impl PyroscopeAgent<PyroscopeAgentRunning> {
/// Stop the agent. The agent will stop profiling and send a last report to the server.
///
/// # Example
/// ```ignore
/// let agent = PyroscopeAgent::builder("http://localhost:8080", "my-app").build()?;
/// let agent_running = agent.start()?;
/// // Expensive operation
/// let agent_ready = agent_running.stop();
/// ```
pub fn stop(mut self) -> Result<PyroscopeAgent<PyroscopeAgentReady>> {
log::debug!(target: LOG_TAG, "Stopping");
// get tx and send termination signal
if let Some(sender) = self.tx.take() {
// Send last session
sender.send(TimerSignal::NextSnapshot(get_time_range(0)?.until))?;
// Terminate PyroscopeAgent internal thread
sender.send(TimerSignal::Terminate)?;
} else {
log::error!("PyroscopeAgent - Missing sender")
}
// Wait for the Thread to finish
let pair = Arc::clone(&self.running);
let (lock, cvar) = &*pair;
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;
Ok(self.transition())
}
/// Return a tuple of functions to add and remove tags to the agent across
/// thread boundaries. This function can be called multiple times.
///
/// # Example
/// ```ignore
/// let agent = PyroscopeAgent::builder("http://localhost:8080", "my-app").build()?;
/// let agent_running = agent.start()?;
/// let (add_tag, remove_tag) = agent_running.tag_wrapper();
/// ```
///
/// The functions can be later called from any thread.
///
/// # Example
/// ```ignore
/// add_tag("key".to_string(), "value".to_string());
/// // some computation
/// remove_tag("key".to_string(), "value".to_string());
/// ```
pub fn tag_wrapper(
&self,
) -> (
impl Fn(String, String) -> Result<()>,
impl Fn(String, String) -> Result<()>,
) {
let backend_add = self.backend.backend.clone();
let backend_remove = self.backend.backend.clone();
(
move |key, value| {
let thread_id = crate::utils::pthread_self()?;
let rule = Rule::ThreadTag(thread_id, Tag::new(key, value));
let backend = backend_add.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.add_rule(rule)?;
Ok(())
},
move |key, value| {
let thread_id = crate::utils::pthread_self()?;
let rule = Rule::ThreadTag(thread_id, Tag::new(key, value));
let backend = backend_remove.lock()?;
backend
.as_ref()
.ok_or_else(|| {
PyroscopeError::AdHoc(
"PyroscopeAgent - Failed to unwrap backend".to_string(),
)
})?
.remove_rule(rule)?;
Ok(())
},
)
}
/// Add a global Tag rule to the backend Ruleset. For tagging, it's
/// recommended to use the `tag_wrapper` function.
pub fn add_global_tag(&self, tag: Tag) -> Result<()> {
let rule = Rule::GlobalTag(tag);
self.backend.add_rule(rule)?;
Ok(())
}
/// Remove a global Tag rule from the backend Ruleset. For tagging, it's
/// recommended to use the `tag_wrapper` function.
pub fn remove_global_tag(&self, tag: Tag) -> Result<()> {
let rule = Rule::GlobalTag(tag);
self.backend.remove_rule(rule)?;
Ok(())
}
/// Add a thread Tag rule to the backend Ruleset. For tagging, it's
/// recommended to use the `tag_wrapper` function.
pub fn add_thread_tag(&self, thread_id: u64, tag: Tag) -> Result<()> {
let rule = Rule::ThreadTag(thread_id, tag);
self.backend.add_rule(rule)?;
Ok(())
}
/// Remove a thread Tag rule from the backend Ruleset. For tagging, it's
/// recommended to use the `tag_wrapper` function.
pub fn remove_thread_tag(&self, thread_id: u64, tag: Tag) -> Result<()> {
let rule = Rule::ThreadTag(thread_id, tag);
self.backend.remove_rule(rule)?;
Ok(())
}
}