1use py_spy::{config::Config, sampler::Sampler};
2use pyroscope::{
3 backend::{
4 Backend, BackendConfig, BackendImpl, BackendUninitialized, Report, Rule, Ruleset,
5 StackBuffer, StackFrame, StackTrace,
6 },
7 error::{PyroscopeError, Result},
8};
9use std::{
10 ops::Deref,
11 sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc, Mutex,
14 },
15 thread::JoinHandle,
16};
17
18const LOG_TAG: &str = "Pyroscope::Pyspy";
19
20pub fn pyspy_backend(config: PyspyConfig) -> BackendImpl<BackendUninitialized> {
22 let backend_config = config.backend_config.clone();
24
25 BackendImpl::new(Box::new(Pyspy::new(config)), Some(backend_config))
26}
27
28#[derive(Debug, Clone)]
30pub struct PyspyConfig {
31 pid: Option<i32>,
33 sample_rate: u32,
35 backend_config: BackendConfig,
37 lock_process: py_spy::config::LockingStrategy,
39 time_limit: Option<core::time::Duration>,
41 with_subprocesses: bool,
43 include_idle: bool,
45 gil_only: bool,
47 native: bool,
49}
50
51impl Default for PyspyConfig {
52 fn default() -> Self {
53 PyspyConfig {
54 pid: Some(0),
55 sample_rate: 100,
56 backend_config: BackendConfig::default(),
57 lock_process: py_spy::config::LockingStrategy::NonBlocking,
58 time_limit: None,
59 with_subprocesses: false,
60 include_idle: false,
61 gil_only: false,
62 native: false,
63 }
64 }
65}
66
67impl PyspyConfig {
68 pub fn new(pid: i32) -> Self {
70 PyspyConfig {
71 pid: Some(pid),
72 ..Default::default()
73 }
74 }
75
76 pub fn sample_rate(self, sample_rate: u32) -> Self {
78 PyspyConfig {
79 sample_rate,
80 ..self
81 }
82 }
83
84 pub fn report_pid(self) -> Self {
86 let backend_config = BackendConfig {
87 report_pid: true,
88 ..self.backend_config
89 };
90
91 PyspyConfig {
92 backend_config,
93 ..self
94 }
95 }
96
97 pub fn report_thread_id(self) -> Self {
99 let backend_config = BackendConfig {
100 report_thread_id: true,
101 ..self.backend_config
102 };
103
104 PyspyConfig {
105 backend_config,
106 ..self
107 }
108 }
109
110 pub fn report_thread_name(self) -> Self {
112 let backend_config = BackendConfig {
113 report_thread_name: true,
114 ..self.backend_config
115 };
116
117 PyspyConfig {
118 backend_config,
119 ..self
120 }
121 }
122
123 pub fn lock_process(self, lock_process: bool) -> Self {
125 PyspyConfig {
126 lock_process: if lock_process {
127 py_spy::config::LockingStrategy::Lock
128 } else {
129 py_spy::config::LockingStrategy::NonBlocking
130 },
131 ..self
132 }
133 }
134
135 pub fn time_limit(self, time_limit: Option<core::time::Duration>) -> Self {
137 PyspyConfig { time_limit, ..self }
138 }
139
140 pub fn with_subprocesses(self, with_subprocesses: bool) -> Self {
142 PyspyConfig {
143 with_subprocesses,
144 ..self
145 }
146 }
147
148 pub fn include_idle(self, include_idle: bool) -> Self {
150 PyspyConfig {
151 include_idle,
152 ..self
153 }
154 }
155
156 pub fn gil_only(self, gil_only: bool) -> Self {
158 PyspyConfig { gil_only, ..self }
159 }
160
161 pub fn native(self, native: bool) -> Self {
163 PyspyConfig { native, ..self }
164 }
165}
166
167#[derive(Default)]
169pub struct Pyspy {
170 buffer: Arc<Mutex<StackBuffer>>,
172 config: PyspyConfig,
174 sampler_config: Option<Config>,
176 sampler_thread: Option<JoinHandle<Result<()>>>,
178 running: Arc<AtomicBool>,
180 ruleset: Arc<Mutex<Ruleset>>,
182}
183
184impl std::fmt::Debug for Pyspy {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(f, "Pyspy Backend")
187 }
188}
189
190impl Pyspy {
191 pub fn new(config: PyspyConfig) -> Self {
193 Pyspy {
194 buffer: Arc::new(Mutex::new(StackBuffer::default())),
195 config,
196 sampler_config: None,
197 sampler_thread: None,
198 running: Arc::new(AtomicBool::new(false)),
199 ruleset: Arc::new(Mutex::new(Ruleset::default())),
200 }
201 }
202}
203
204impl Backend for Pyspy {
205 fn spy_name(&self) -> Result<String> {
207 Ok("pyspy".to_string())
208 }
209
210 fn spy_extension(&self) -> Result<Option<String>> {
212 Ok(Some("cpu".to_string()))
213 }
214
215 fn sample_rate(&self) -> Result<u32> {
217 Ok(self.config.sample_rate)
218 }
219
220 fn set_config(&self, config: BackendConfig) {}
221
222 fn get_config(&self) -> Result<BackendConfig> {
223 Ok(self.config.backend_config)
224 }
225
226 fn add_rule(&self, rule: Rule) -> Result<()> {
228 self.ruleset.lock()?.add_rule(rule)?;
229
230 Ok(())
231 }
232
233 fn remove_rule(&self, rule: Rule) -> Result<()> {
235 self.ruleset.lock()?.remove_rule(rule)?;
236
237 Ok(())
238 }
239
240 fn initialize(&mut self) -> Result<()> {
242 if self.config.pid.is_none() {
244 return Err(PyroscopeError::new("Pyspy: No Process ID Specified"));
245 }
246
247 let duration = match self.config.time_limit {
249 Some(duration) => py_spy::config::RecordDuration::Seconds(duration.as_secs()),
250 None => py_spy::config::RecordDuration::Unlimited,
251 };
252
253 self.sampler_config = Some(Config {
255 blocking: self.config.lock_process.clone(),
256 native: self.config.native,
257 pid: self.config.pid,
258 sampling_rate: self.config.sample_rate as u64,
259 include_idle: self.config.include_idle,
260 include_thread_ids: true,
261 subprocesses: self.config.with_subprocesses,
262 gil_only: self.config.gil_only,
263 duration,
264 ..Config::default()
265 });
266
267 let running = Arc::clone(&self.running);
269 running.store(true, Ordering::Relaxed);
270
271 let buffer = self.buffer.clone();
273
274 let config = self
276 .sampler_config
277 .clone()
278 .ok_or_else(|| PyroscopeError::new("Pyspy: Sampler configuration is not set"))?;
279
280 let ruleset = self.ruleset.clone();
282
283 let backend_config = self.config.backend_config.clone();
284
285 self.sampler_thread = Some(std::thread::spawn(move || {
286 let pid = config
288 .pid
289 .ok_or_else(|| PyroscopeError::new("Pyspy: PID is not set"))?;
290
291 let sampler = Sampler::new(pid, &config)
293 .map_err(|e| PyroscopeError::new(&format!("Pyspy: Sampler Error: {}", e)))?;
294
295 let sampler_output = sampler.take_while(|_x| running.load(Ordering::Relaxed));
297
298 for sample in sampler_output {
300 for trace in sample.traces {
301 if !(config.include_idle || trace.active) {
303 continue;
304 }
305
306 if config.gil_only && !trace.owns_gil {
308 continue;
309 }
310
311 let own_trace: StackTrace =
313 Into::<StackTraceWrapper>::into((trace.clone(), &backend_config)).into();
314
315 let stacktrace = own_trace + &ruleset.lock()?.clone();
317
318 buffer.lock()?.record(stacktrace)?;
320 }
321 }
322
323 Ok(())
324 }));
325
326 Ok(())
327 }
328
329 fn shutdown(self: Box<Self>) -> Result<()> {
331 log::trace!(target: LOG_TAG, "Shutting down sampler thread");
332
333 self.running.store(false, Ordering::Relaxed);
335
336 self.sampler_thread
338 .ok_or_else(|| PyroscopeError::new("Pyspy: Failed to unwrap Sampler Thread"))?
339 .join()
340 .unwrap_or_else(|_| Err(PyroscopeError::new("Pyspy: Failed to join sampler thread")))?;
341
342 Ok(())
343 }
344
345 fn report(&mut self) -> Result<Vec<Report>> {
347 let report: StackBuffer = self.buffer.lock()?.deref().to_owned();
349 let reports: Vec<Report> = report.into();
350
351 self.buffer.lock()?.clear();
353
354 Ok(reports)
355 }
356}
357
358struct StackFrameWrapper(StackFrame);
361
362impl From<StackFrameWrapper> for StackFrame {
363 fn from(stack_frame: StackFrameWrapper) -> Self {
364 stack_frame.0
365 }
366}
367
368impl From<py_spy::Frame> for StackFrameWrapper {
369 fn from(frame: py_spy::Frame) -> Self {
370 StackFrameWrapper(StackFrame {
371 module: frame.module.clone(),
372 name: Some(frame.name.clone()),
373 filename: frame.short_filename.clone(),
374 relative_path: None,
375 absolute_path: Some(frame.filename.clone()),
376 line: Some(frame.line as u32),
377 })
378 }
379}
380
381struct StackTraceWrapper(StackTrace);
384
385impl From<StackTraceWrapper> for StackTrace {
386 fn from(stack_trace: StackTraceWrapper) -> Self {
387 stack_trace.0
388 }
389}
390
391impl From<(py_spy::StackTrace, &BackendConfig)> for StackTraceWrapper {
392 fn from(arg: (py_spy::StackTrace, &BackendConfig)) -> Self {
393 let (stack_trace, config) = arg;
394 let stacktrace = StackTrace::new(
395 config,
396 Some(stack_trace.pid as u32),
397 Some(stack_trace.thread_id as u64),
398 stack_trace.thread_name.clone(),
399 stack_trace
400 .frames
401 .iter()
402 .map(|frame| Into::<StackFrameWrapper>::into(frame.clone()).into())
403 .collect(),
404 );
405 StackTraceWrapper(stacktrace)
406 }
407}