pyroscope_pyspy/
lib.rs

1use py_spy::{config::Config, sampler::Sampler};
2use pyroscope::{
3    backend::{
4        Backend, BackendConfig, BackendImpl, BackendUninitialized, Report, Rule, Ruleset,
5        StackBuffer, StackFrame, StackTrace,
6    },
7    error::{PyroscopeError, Result},
8};
9use std::{
10    ops::Deref,
11    sync::{
12        atomic::{AtomicBool, Ordering},
13        Arc, Mutex,
14    },
15    thread::JoinHandle,
16};
17
18const LOG_TAG: &str = "Pyroscope::Pyspy";
19
20/// Short-hand function for creating a new Pyspy backend.
21pub fn pyspy_backend(config: PyspyConfig) -> BackendImpl<BackendUninitialized> {
22    // Clone BackendConfig to pass to the backend object.
23    let backend_config = config.backend_config.clone();
24
25    BackendImpl::new(Box::new(Pyspy::new(config)), Some(backend_config))
26}
27
28/// Pyspy Configuration
29#[derive(Debug, Clone)]
30pub struct PyspyConfig {
31    /// Process to monitor
32    pid: Option<i32>,
33    /// Sampling rate
34    sample_rate: u32,
35    /// Backend Config
36    backend_config: BackendConfig,
37    /// Lock Process while sampling
38    lock_process: py_spy::config::LockingStrategy,
39    /// Profiling duration (None for infinite)
40    time_limit: Option<core::time::Duration>,
41    /// Include subprocesses
42    with_subprocesses: bool,
43    /// Include idle time
44    include_idle: bool,
45    /// Detect Python GIL
46    gil_only: bool,
47    /// Profile native C extensions
48    native: bool,
49}
50
51impl Default for PyspyConfig {
52    fn default() -> Self {
53        PyspyConfig {
54            pid: Some(0),
55            sample_rate: 100,
56            backend_config: BackendConfig::default(),
57            lock_process: py_spy::config::LockingStrategy::NonBlocking,
58            time_limit: None,
59            with_subprocesses: false,
60            include_idle: false,
61            gil_only: false,
62            native: false,
63        }
64    }
65}
66
67impl PyspyConfig {
68    /// Create a new PyspyConfig
69    pub fn new(pid: i32) -> Self {
70        PyspyConfig {
71            pid: Some(pid),
72            ..Default::default()
73        }
74    }
75
76    /// Set the sampling rate
77    pub fn sample_rate(self, sample_rate: u32) -> Self {
78        PyspyConfig {
79            sample_rate,
80            ..self
81        }
82    }
83
84    /// Tag thread id in report
85    pub fn report_pid(self) -> Self {
86        let backend_config = BackendConfig {
87            report_pid: true,
88            ..self.backend_config
89        };
90
91        PyspyConfig {
92            backend_config,
93            ..self
94        }
95    }
96
97    /// Tag thread id in report
98    pub fn report_thread_id(self) -> Self {
99        let backend_config = BackendConfig {
100            report_thread_id: true,
101            ..self.backend_config
102        };
103
104        PyspyConfig {
105            backend_config,
106            ..self
107        }
108    }
109
110    /// Tag thread name in report
111    pub fn report_thread_name(self) -> Self {
112        let backend_config = BackendConfig {
113            report_thread_name: true,
114            ..self.backend_config
115        };
116
117        PyspyConfig {
118            backend_config,
119            ..self
120        }
121    }
122
123    /// Set the lock process flag
124    pub fn lock_process(self, lock_process: bool) -> Self {
125        PyspyConfig {
126            lock_process: if lock_process {
127                py_spy::config::LockingStrategy::Lock
128            } else {
129                py_spy::config::LockingStrategy::NonBlocking
130            },
131            ..self
132        }
133    }
134
135    /// Set the time limit
136    pub fn time_limit(self, time_limit: Option<core::time::Duration>) -> Self {
137        PyspyConfig { time_limit, ..self }
138    }
139
140    /// Include subprocesses
141    pub fn with_subprocesses(self, with_subprocesses: bool) -> Self {
142        PyspyConfig {
143            with_subprocesses,
144            ..self
145        }
146    }
147
148    /// Include idle time
149    pub fn include_idle(self, include_idle: bool) -> Self {
150        PyspyConfig {
151            include_idle,
152            ..self
153        }
154    }
155
156    /// Detect Python GIL
157    pub fn gil_only(self, gil_only: bool) -> Self {
158        PyspyConfig { gil_only, ..self }
159    }
160
161    /// Profile native C extensions
162    pub fn native(self, native: bool) -> Self {
163        PyspyConfig { native, ..self }
164    }
165}
166
167/// Pyspy Backend
168#[derive(Default)]
169pub struct Pyspy {
170    /// Profiling buffer
171    buffer: Arc<Mutex<StackBuffer>>,
172    /// Pyspy Configuration
173    config: PyspyConfig,
174    /// Sampler configuration
175    sampler_config: Option<Config>,
176    /// Sampler thread
177    sampler_thread: Option<JoinHandle<Result<()>>>,
178    /// Atomic flag to stop the sampler
179    running: Arc<AtomicBool>,
180    /// Ruleset
181    ruleset: Arc<Mutex<Ruleset>>,
182}
183
184impl std::fmt::Debug for Pyspy {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        write!(f, "Pyspy Backend")
187    }
188}
189
190impl Pyspy {
191    /// Create a new Pyspy Backend.
192    pub fn new(config: PyspyConfig) -> Self {
193        Pyspy {
194            buffer: Arc::new(Mutex::new(StackBuffer::default())),
195            config,
196            sampler_config: None,
197            sampler_thread: None,
198            running: Arc::new(AtomicBool::new(false)),
199            ruleset: Arc::new(Mutex::new(Ruleset::default())),
200        }
201    }
202}
203
204impl Backend for Pyspy {
205    /// Return the name of the backend.
206    fn spy_name(&self) -> Result<String> {
207        Ok("pyspy".to_string())
208    }
209
210    /// Return the extension of the backend.
211    fn spy_extension(&self) -> Result<Option<String>> {
212        Ok(Some("cpu".to_string()))
213    }
214
215    /// Return the sample rate.
216    fn sample_rate(&self) -> Result<u32> {
217        Ok(self.config.sample_rate)
218    }
219
220    fn set_config(&self, config: BackendConfig) {}
221
222    fn get_config(&self) -> Result<BackendConfig> {
223        Ok(self.config.backend_config)
224    }
225
226    /// Add a rule to the ruleset.
227    fn add_rule(&self, rule: Rule) -> Result<()> {
228        self.ruleset.lock()?.add_rule(rule)?;
229
230        Ok(())
231    }
232
233    /// Remove a rule from the ruleset.
234    fn remove_rule(&self, rule: Rule) -> Result<()> {
235        self.ruleset.lock()?.remove_rule(rule)?;
236
237        Ok(())
238    }
239
240    /// Initialize the backend.
241    fn initialize(&mut self) -> Result<()> {
242        // Check if a process ID is set
243        if self.config.pid.is_none() {
244            return Err(PyroscopeError::new("Pyspy: No Process ID Specified"));
245        }
246
247        // Set duration for py-spy
248        let duration = match self.config.time_limit {
249            Some(duration) => py_spy::config::RecordDuration::Seconds(duration.as_secs()),
250            None => py_spy::config::RecordDuration::Unlimited,
251        };
252
253        // Create a new py-spy configuration
254        self.sampler_config = Some(Config {
255            blocking: self.config.lock_process.clone(),
256            native: self.config.native,
257            pid: self.config.pid,
258            sampling_rate: self.config.sample_rate as u64,
259            include_idle: self.config.include_idle,
260            include_thread_ids: true,
261            subprocesses: self.config.with_subprocesses,
262            gil_only: self.config.gil_only,
263            duration,
264            ..Config::default()
265        });
266
267        // set sampler state to running
268        let running = Arc::clone(&self.running);
269        running.store(true, Ordering::Relaxed);
270
271        // create a new buffer reference
272        let buffer = self.buffer.clone();
273
274        // create a new sampler_config reference
275        let config = self
276            .sampler_config
277            .clone()
278            .ok_or_else(|| PyroscopeError::new("Pyspy: Sampler configuration is not set"))?;
279
280        // create a new ruleset reference
281        let ruleset = self.ruleset.clone();
282
283        let backend_config = self.config.backend_config.clone();
284
285        self.sampler_thread = Some(std::thread::spawn(move || {
286            // Get PID
287            let pid = config
288                .pid
289                .ok_or_else(|| PyroscopeError::new("Pyspy: PID is not set"))?;
290
291            // Create a new pyspy sampler
292            let sampler = Sampler::new(pid, &config)
293                .map_err(|e| PyroscopeError::new(&format!("Pyspy: Sampler Error: {}", e)))?;
294
295            // Keep the sampler running until the running flag is set to false
296            let sampler_output = sampler.take_while(|_x| running.load(Ordering::Relaxed));
297
298            // Collect the sampler output
299            for sample in sampler_output {
300                for trace in sample.traces {
301                    // idle config
302                    if !(config.include_idle || trace.active) {
303                        continue;
304                    }
305
306                    // gil config
307                    if config.gil_only && !trace.owns_gil {
308                        continue;
309                    }
310
311                    // Convert py-spy trace to a Pyroscope trace
312                    let own_trace: StackTrace =
313                        Into::<StackTraceWrapper>::into((trace.clone(), &backend_config)).into();
314
315                    // apply ruleset
316                    let stacktrace = own_trace + &ruleset.lock()?.clone();
317
318                    // Add the trace to the buffer
319                    buffer.lock()?.record(stacktrace)?;
320                }
321            }
322
323            Ok(())
324        }));
325
326        Ok(())
327    }
328
329    /// Shutdown the backend.
330    fn shutdown(self: Box<Self>) -> Result<()> {
331        log::trace!(target: LOG_TAG, "Shutting down sampler thread");
332
333        // set running to false, terminate sampler thread
334        self.running.store(false, Ordering::Relaxed);
335
336        // wait for sampler thread to finish
337        self.sampler_thread
338            .ok_or_else(|| PyroscopeError::new("Pyspy: Failed to unwrap Sampler Thread"))?
339            .join()
340            .unwrap_or_else(|_| Err(PyroscopeError::new("Pyspy: Failed to join sampler thread")))?;
341
342        Ok(())
343    }
344
345    /// Report buffer
346    fn report(&mut self) -> Result<Vec<Report>> {
347        // convert the buffer report into a byte vector
348        let report: StackBuffer = self.buffer.lock()?.deref().to_owned();
349        let reports: Vec<Report> = report.into();
350
351        // Clear the buffer
352        self.buffer.lock()?.clear();
353
354        Ok(reports)
355    }
356}
357
358/// Wrapper for StackFrame. This is needed because both StackFrame and
359/// py_spy::Frame are not defined in the same module.
360struct StackFrameWrapper(StackFrame);
361
362impl From<StackFrameWrapper> for StackFrame {
363    fn from(stack_frame: StackFrameWrapper) -> Self {
364        stack_frame.0
365    }
366}
367
368impl From<py_spy::Frame> for StackFrameWrapper {
369    fn from(frame: py_spy::Frame) -> Self {
370        StackFrameWrapper(StackFrame {
371            module: frame.module.clone(),
372            name: Some(frame.name.clone()),
373            filename: frame.short_filename.clone(),
374            relative_path: None,
375            absolute_path: Some(frame.filename.clone()),
376            line: Some(frame.line as u32),
377        })
378    }
379}
380
381/// Wrapper for StackTrace. This is needed because both StackTrace and
382/// py_spy::StackTrace are not defined in the same module.
383struct StackTraceWrapper(StackTrace);
384
385impl From<StackTraceWrapper> for StackTrace {
386    fn from(stack_trace: StackTraceWrapper) -> Self {
387        stack_trace.0
388    }
389}
390
391impl From<(py_spy::StackTrace, &BackendConfig)> for StackTraceWrapper {
392    fn from(arg: (py_spy::StackTrace, &BackendConfig)) -> Self {
393        let (stack_trace, config) = arg;
394        let stacktrace = StackTrace::new(
395            config,
396            Some(stack_trace.pid as u32),
397            Some(stack_trace.thread_id as u64),
398            stack_trace.thread_name.clone(),
399            stack_trace
400                .frames
401                .iter()
402                .map(|frame| Into::<StackFrameWrapper>::into(frame.clone()).into())
403                .collect(),
404        );
405        StackTraceWrapper(stacktrace)
406    }
407}