use pyroscope::{
backend::{
Backend, BackendConfig, BackendImpl, BackendUninitialized, Report, Rule, Ruleset,
StackBuffer, StackFrame, StackTrace,
},
error::{PyroscopeError, Result},
};
use rbspy::sampler::Sampler;
use std::{
ops::Deref,
sync::{
mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
Arc, Mutex,
},
thread::JoinHandle,
};
pub fn rbspy_backend(config: RbspyConfig) -> BackendImpl<BackendUninitialized> {
let backend_config = config.backend_config.clone();
BackendImpl::new(Box::new(Rbspy::new(config)), Some(backend_config))
}
#[derive(Debug)]
pub struct RbspyConfig {
pid: Option<i32>,
sample_rate: u32,
backend_config: BackendConfig,
lock_process: bool,
time_limit: Option<core::time::Duration>,
with_subprocesses: bool,
}
impl Default for RbspyConfig {
fn default() -> Self {
RbspyConfig {
pid: None,
sample_rate: 100,
backend_config: BackendConfig::default(),
lock_process: false,
time_limit: None,
with_subprocesses: false,
}
}
}
impl RbspyConfig {
pub fn new(pid: i32) -> Self {
RbspyConfig {
pid: Some(pid),
..Default::default()
}
}
pub fn sample_rate(self, sample_rate: u32) -> Self {
RbspyConfig {
sample_rate,
..self
}
}
pub fn report_pid(self) -> Self {
let backend_config = BackendConfig {
report_pid: true,
..self.backend_config
};
RbspyConfig {
backend_config,
..self
}
}
pub fn report_thread_id(self) -> Self {
let backend_config = BackendConfig {
report_thread_id: true,
..self.backend_config
};
RbspyConfig {
backend_config,
..self
}
}
pub fn lock_process(self, lock_process: bool) -> Self {
RbspyConfig {
lock_process,
..self
}
}
pub fn time_limit(self, time_limit: Option<core::time::Duration>) -> Self {
RbspyConfig { time_limit, ..self }
}
pub fn with_subprocesses(self, with_subprocesses: bool) -> Self {
RbspyConfig {
with_subprocesses,
..self
}
}
}
#[derive(Default)]
pub struct Rbspy {
config: RbspyConfig,
sampler: Option<Sampler>,
error_receiver: Option<Receiver<std::result::Result<(), anyhow::Error>>>,
buffer: Arc<Mutex<StackBuffer>>,
ruleset: Ruleset,
}
impl std::fmt::Debug for Rbspy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Rbspy Backend")
}
}
impl Rbspy {
pub fn new(config: RbspyConfig) -> Self {
Rbspy {
sampler: None,
error_receiver: None,
config,
buffer: Arc::new(Mutex::new(StackBuffer::default())),
ruleset: Ruleset::default(),
}
}
}
type ErrorSender = Sender<std::result::Result<(), anyhow::Error>>;
type ErrorReceiver = Receiver<std::result::Result<(), anyhow::Error>>;
impl Backend for Rbspy {
fn spy_name(&self) -> Result<String> {
Ok("rbspy".to_string())
}
fn spy_extension(&self) -> Result<Option<String>> {
Ok(Some("cpu".to_string()))
}
fn sample_rate(&self) -> Result<u32> {
Ok(self.config.sample_rate)
}
fn set_config(&self, config: BackendConfig) {}
fn get_config(&self) -> Result<BackendConfig> {
Ok(self.config.backend_config)
}
fn add_rule(&self, rule: Rule) -> Result<()> {
self.ruleset.add_rule(rule)?;
Ok(())
}
fn remove_rule(&self, rule: Rule) -> Result<()> {
self.ruleset.remove_rule(rule)?;
Ok(())
}
fn initialize(&mut self) -> Result<()> {
if self.config.pid.is_none() {
return Err(PyroscopeError::new("Rbspy: No Process ID Specified"));
}
self.sampler = Some(Sampler::new(
self.config.pid.unwrap(), self.config.sample_rate,
self.config.lock_process,
self.config.time_limit,
self.config.with_subprocesses,
None,
true,
));
let (error_sender, error_receiver): (ErrorSender, ErrorReceiver) = channel();
let queue_size: usize = self.config.sample_rate as usize * 10 * 100;
let (stack_sender, stack_receiver): (
SyncSender<rbspy::StackTrace>,
Receiver<rbspy::StackTrace>,
) = sync_channel(queue_size);
self.error_receiver = Some(error_receiver);
let sampler = self
.sampler
.as_ref()
.ok_or_else(|| PyroscopeError::new("Rbspy: Sampler is not set"))?;
sampler
.start(stack_sender, error_sender)
.map_err(|e| PyroscopeError::new(&format!("Rbspy: Sampler Error: {}", e)))?;
let buffer = self.buffer.clone();
let ruleset = self.ruleset.clone();
let backend_config = self.config.backend_config.clone();
let _: JoinHandle<Result<()>> = std::thread::spawn(move || {
while let Ok(stack_trace) = stack_receiver.recv() {
let own_trace: StackTrace =
Into::<StackTraceWrapper>::into((stack_trace, &backend_config)).into();
let stacktrace = own_trace + &ruleset;
buffer.lock()?.record(stacktrace)?;
}
Ok(())
});
Ok(())
}
fn shutdown(self: Box<Self>) -> Result<()> {
self.sampler
.as_ref()
.ok_or_else(|| PyroscopeError::new("Rbspy: Sampler is not set"))?
.stop();
Ok(())
}
fn report(&mut self) -> Result<Vec<Report>> {
let v8: StackBuffer = self.buffer.lock()?.deref().to_owned();
let reports: Vec<Report> = v8.into();
self.buffer.lock()?.clear();
Ok(reports)
}
}
struct StackFrameWrapper(StackFrame);
impl From<StackFrameWrapper> for StackFrame {
fn from(frame: StackFrameWrapper) -> Self {
frame.0
}
}
impl From<rbspy::StackFrame> for StackFrameWrapper {
fn from(frame: rbspy::StackFrame) -> Self {
StackFrameWrapper(StackFrame {
module: None,
name: Some(frame.name),
filename: Some(frame.relative_path.clone()),
relative_path: Some(frame.relative_path),
absolute_path: frame.absolute_path,
line: Some(frame.lineno),
})
}
}
struct StackTraceWrapper(StackTrace);
impl From<StackTraceWrapper> for StackTrace {
fn from(trace: StackTraceWrapper) -> Self {
trace.0
}
}
impl From<(rbspy::StackTrace, &BackendConfig)> for StackTraceWrapper {
fn from(arg: (rbspy::StackTrace, &BackendConfig)) -> Self {
let (trace, config) = arg;
StackTraceWrapper(StackTrace::new(
config,
trace.pid.map(|pid| pid as u32),
trace.thread_id.map(|id| id as u64),
None,
trace
.iter()
.map(|frame| Into::<StackFrameWrapper>::into(frame.clone()).into())
.collect(),
))
}
}