py_spy/
python_spy.rs

1use std::collections::HashMap;
2#[cfg(all(target_os = "linux", feature = "unwind"))]
3use std::collections::HashSet;
4#[cfg(all(target_os = "linux", feature = "unwind"))]
5use std::iter::FromIterator;
6use std::path::Path;
7
8use anyhow::{Context, Error, Result};
9use remoteprocess::{Pid, Process, ProcessMemory, Tid};
10
11use crate::config::{Config, LockingStrategy};
12#[cfg(feature = "unwind")]
13use crate::native_stack_trace::NativeStack;
14use crate::python_bindings::{
15    v2_7_15, v3_10_0, v3_11_0, v3_12_0, v3_13_0, v3_3_7, v3_5_5, v3_6_6, v3_7_0, v3_8_0, v3_9_5,
16};
17use crate::python_data_access::format_variable;
18use crate::python_interpreters::{InterpreterState, ThreadState};
19use crate::python_process_info::{
20    get_interpreter_address, get_python_version, get_threadstate_address, PythonProcessInfo,
21};
22use crate::python_threading::thread_name_lookup;
23use crate::stack_trace::{get_gil_threadid, get_stack_trace, StackTrace};
24use crate::version::Version;
25
26/// Lets you retrieve stack traces of a running python program
27pub struct PythonSpy {
28    pub pid: Pid,
29    pub process: Process,
30    pub version: Version,
31    pub interpreter_address: usize,
32    pub threadstate_address: usize,
33    pub config: Config,
34    #[cfg(feature = "unwind")]
35    pub native: Option<NativeStack>,
36    pub short_filenames: HashMap<String, Option<String>>,
37    pub python_thread_ids: HashMap<u64, Tid>,
38    pub python_thread_names: HashMap<u64, String>,
39    #[cfg(target_os = "linux")]
40    pub dockerized: bool,
41}
42
43impl PythonSpy {
44    /// Constructs a new PythonSpy object.
45    pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
46        let process = remoteprocess::Process::new(pid)
47            .context("Failed to open process - check if it is running.")?;
48
49        // get basic process information (memory maps/symbols etc)
50        let python_info = PythonProcessInfo::new(&process)?;
51
52        // lock the process when loading up on freebsd (rather than locking
53        // on every memory read). Needs done after getting python process info
54        // because procmaps also tries to attach w/ ptrace on freebsd
55        #[cfg(target_os = "freebsd")]
56        let _lock = process.lock();
57
58        let version = get_python_version(&python_info, &process)?;
59        info!("python version {} detected", version);
60
61        let interpreter_address = get_interpreter_address(&python_info, &process, &version)?;
62        info!("Found interpreter at 0x{:016x}", interpreter_address);
63
64        // lets us figure out which thread has the GIL
65        let threadstate_address = get_threadstate_address(
66            interpreter_address,
67            &python_info,
68            &process,
69            &version,
70            config,
71        )?;
72
73        #[cfg(feature = "unwind")]
74        let native = if config.native {
75            Some(NativeStack::new(
76                pid,
77                python_info.python_binary,
78                python_info.libpython_binary,
79            )?)
80        } else {
81            None
82        };
83
84        Ok(PythonSpy {
85            pid,
86            process,
87            version,
88            interpreter_address,
89            threadstate_address,
90            #[cfg(feature = "unwind")]
91            native,
92            #[cfg(target_os = "linux")]
93            dockerized: python_info.dockerized,
94            config: config.clone(),
95            short_filenames: HashMap::new(),
96            python_thread_ids: HashMap::new(),
97            python_thread_names: HashMap::new(),
98        })
99    }
100
101    /// Creates a PythonSpy object, retrying up to max_retries times.
102    /// Mainly useful for the case where the process is just started and
103    /// symbols or the python interpreter might not be loaded yet.
104    pub fn retry_new(pid: Pid, config: &Config, max_retries: u64) -> Result<PythonSpy, Error> {
105        let mut retries = 0;
106        loop {
107            let err = match PythonSpy::new(pid, config) {
108                Ok(mut process) => {
109                    // verify that we can load a stack trace before returning success
110                    match process.get_stack_traces() {
111                        Ok(_) => return Ok(process),
112                        Err(err) => err,
113                    }
114                }
115                Err(err) => err,
116            };
117
118            // If we failed, retry a couple times before returning the last error
119            retries += 1;
120            if retries >= max_retries {
121                return Err(err);
122            }
123            info!("Failed to connect to process, retrying. Error: {}", err);
124            std::thread::sleep(std::time::Duration::from_millis(20));
125        }
126    }
127
128    /// Gets a StackTrace for each thread in the current process
129    pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, Error> {
130        match self.version {
131            // ABI for 2.3/2.4/2.5/2.6/2.7 is compatible for our purpose
132            Version {
133                major: 2,
134                minor: 3..=7,
135                ..
136            } => self._get_stack_traces::<v2_7_15::_is>(),
137            Version {
138                major: 3, minor: 3, ..
139            } => self._get_stack_traces::<v3_3_7::_is>(),
140            // ABI for 3.4 and 3.5 is the same for our purposes
141            Version {
142                major: 3, minor: 4, ..
143            } => self._get_stack_traces::<v3_5_5::_is>(),
144            Version {
145                major: 3, minor: 5, ..
146            } => self._get_stack_traces::<v3_5_5::_is>(),
147            Version {
148                major: 3, minor: 6, ..
149            } => self._get_stack_traces::<v3_6_6::_is>(),
150            Version {
151                major: 3, minor: 7, ..
152            } => self._get_stack_traces::<v3_7_0::_is>(),
153            // v3.8.0a1 to v3.8.0a3 is compatible with 3.7 ABI, but later versions of 3.8.0 aren't
154            Version {
155                major: 3,
156                minor: 8,
157                patch: 0,
158                ..
159            } => match self.version.release_flags.as_ref() {
160                "a1" | "a2" | "a3" => self._get_stack_traces::<v3_7_0::_is>(),
161                _ => self._get_stack_traces::<v3_8_0::_is>(),
162            },
163            Version {
164                major: 3, minor: 8, ..
165            } => self._get_stack_traces::<v3_8_0::_is>(),
166            Version {
167                major: 3, minor: 9, ..
168            } => self._get_stack_traces::<v3_9_5::_is>(),
169            Version {
170                major: 3,
171                minor: 10,
172                ..
173            } => self._get_stack_traces::<v3_10_0::_is>(),
174            Version {
175                major: 3,
176                minor: 11,
177                ..
178            } => self._get_stack_traces::<v3_11_0::_is>(),
179            Version {
180                major: 3,
181                minor: 12,
182                ..
183            } => self._get_stack_traces::<v3_12_0::_is>(),
184            Version {
185                major: 3,
186                minor: 13,
187                ..
188            } => self._get_stack_traces::<v3_13_0::_is>(),
189            _ => Err(format_err!(
190                "Unsupported version of Python: {}",
191                self.version
192            )),
193        }
194    }
195
196    // implementation of get_stack_traces, where we have a type for the InterpreterState
197    fn _get_stack_traces<I: InterpreterState>(&mut self) -> Result<Vec<StackTrace>, Error> {
198        // Query the OS to get if each thread in the process is running or not
199        let mut thread_activity = HashMap::new();
200        if self.config.gil_only {
201            // Don't need to collect thread activity if we're only getting the
202            // GIL thread: If we're holding the GIL we're by definition active.
203        } else {
204            for thread in self.process.threads()?.iter() {
205                let threadid: Tid = thread.id()?;
206                let Ok(active) = thread.active() else {
207                    // Do not fail all sampling if a single thread died between entering the loop
208                    // and reading its status.
209                    continue;
210                };
211                thread_activity.insert(threadid, active);
212            }
213        }
214
215        // Lock the process if appropriate. Note we have to lock AFTER getting the thread
216        // activity status from the OS (otherwise each thread would report being inactive always).
217        // This has the potential for race conditions (in that the thread activity could change
218        // between getting the status and locking the thread, but seems unavoidable right now
219        let _lock = if self.config.blocking == LockingStrategy::Lock {
220            Some(self.process.lock().context("Failed to suspend process")?)
221        } else {
222            None
223        };
224
225        // Find PyThreadState, and loop over all the python threads
226        let threadstate_ptr_ptr = I::threadstate_ptr_ptr(self.interpreter_address);
227        let threads_head = self
228            .process
229            .copy_pointer(threadstate_ptr_ptr)
230            .context("Failed to copy PyThreadState head pointer")?;
231
232        // get the threadid of the gil if appropriate
233        let gil_thread_id = get_gil_threadid::<I, Process>(self.threadstate_address, &self.process)
234            .context("failed to get gil_thread_id")?;
235
236        let mut traces = Vec::new();
237        let mut threads = threads_head;
238        while !threads.is_null() {
239            // Get the stack trace of the python thread
240            let thread = self
241                .process
242                .copy_pointer(threads)
243                .context("Failed to copy PyThreadState")?;
244            threads = thread.next();
245
246            let python_thread_id = thread.thread_id();
247            let owns_gil = python_thread_id == gil_thread_id;
248
249            if self.config.gil_only && !owns_gil {
250                continue;
251            }
252
253            let mut trace = get_stack_trace(
254                &thread,
255                &self.process,
256                self.config.dump_locals > 0,
257                self.config.lineno,
258            )?;
259
260            // Try getting the native thread id
261
262            // python 3.11+ has the native thread id directly on the PyThreadState object,
263            // for older versions of python, try using OS specific code to get the native
264            // thread id (doesn't work on freebsd, or on arm/i686 processors on linux)
265            if trace.os_thread_id.is_none() {
266                let mut os_thread_id =
267                    self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
268
269                // linux can see issues where pthread_ids get recycled for new OS threads,
270                // which totally breaks the caching we were doing here. Detect this and retry
271                if let Some(tid) = os_thread_id {
272                    if !thread_activity.is_empty() && !thread_activity.contains_key(&tid) {
273                        info!("clearing away thread id caches, thread {} has exited", tid);
274                        self.python_thread_ids.clear();
275                        self.python_thread_names.clear();
276                        os_thread_id =
277                            self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
278                    }
279                }
280
281                trace.os_thread_id = os_thread_id.map(|id| id as u64);
282            }
283
284            trace.thread_name = self._get_python_thread_name(python_thread_id);
285            trace.owns_gil = owns_gil;
286            trace.pid = self.process.pid;
287
288            // Figure out if the thread is sleeping from the OS if possible
289            trace.active = true;
290            if let Some(id) = trace.os_thread_id {
291                let id = id as Tid;
292                if let Some(active) = thread_activity.get(&id as _) {
293                    trace.active = *active;
294                }
295            }
296
297            // fallback to using a heuristic if we think the thread is still active
298            // Note that on linux the OS thread activity can only be gotten on x86_64
299            // processors and even then seems to be wrong occasionally in thinking 'select'
300            // calls are active (which seems related to the thread locking code,
301            // this problem doesn't seem to happen with the --nonblocking option)
302            // Note: this should be done before the native merging for correct results
303            if trace.active {
304                trace.active = !self._heuristic_is_thread_idle(&trace);
305            }
306
307            // Merge in the native stack frames if necessary
308            #[cfg(feature = "unwind")]
309            {
310                if self.config.native {
311                    if let Some(native) = self.native.as_mut() {
312                        let thread_id = trace
313                            .os_thread_id
314                            .ok_or_else(|| format_err!("failed to get os threadid"))?;
315                        let os_thread = remoteprocess::Thread::new(thread_id as Tid)?;
316                        trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
317                    }
318                }
319            }
320
321            for frame in &mut trace.frames {
322                frame.short_filename = self.shorten_filename(&frame.filename);
323                if let Some(locals) = frame.locals.as_mut() {
324                    let max_length = (128 * self.config.dump_locals) as isize;
325                    for local in locals {
326                        let repr = format_variable::<I, Process>(
327                            &self.process,
328                            &self.version,
329                            local.addr,
330                            max_length,
331                        );
332                        local.repr = Some(repr.unwrap_or_else(|_| "?".to_owned()));
333                    }
334                }
335            }
336
337            traces.push(trace);
338
339            // This seems to happen occasionally when scanning BSS addresses for valid interpreters
340            if traces.len() > 4096 {
341                return Err(format_err!("Max thread recursion depth reached"));
342            }
343
344            if self.config.gil_only {
345                // There's only one GIL thread and we've captured it, so we can
346                // stop now
347                break;
348            }
349        }
350        Ok(traces)
351    }
352
353    // heuristic fallback for determining if a thread is active, used
354    // when we don't have the ability to get the thread information from the OS
355    fn _heuristic_is_thread_idle(&self, trace: &StackTrace) -> bool {
356        let frames = &trace.frames;
357        if frames.is_empty() {
358            // we could have 0 python frames, but still be active running native
359            // code.
360            false
361        } else {
362            let frame = &frames[0];
363            (frame.name == "wait" && frame.filename.ends_with("threading.py"))
364                || (frame.name == "select" && frame.filename.ends_with("selectors.py"))
365                || (frame.name == "poll"
366                    && (frame.filename.ends_with("asyncore.py")
367                        || frame.filename.contains("zmq")
368                        || frame.filename.contains("gevent")
369                        || frame.filename.contains("tornado")))
370        }
371    }
372
373    #[cfg(windows)]
374    fn _get_os_thread_id<I: InterpreterState>(
375        &mut self,
376        python_thread_id: u64,
377        _interp_head: *const I::ThreadState,
378    ) -> Result<Option<Tid>, Error> {
379        Ok(Some(python_thread_id as Tid))
380    }
381
382    #[cfg(target_os = "macos")]
383    fn _get_os_thread_id<I: InterpreterState>(
384        &mut self,
385        python_thread_id: u64,
386        _interp_head: *const I::ThreadState,
387    ) -> Result<Option<Tid>, Error> {
388        // If we've already know this threadid, we're good
389        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
390            return Ok(Some(*thread_id));
391        }
392
393        for thread in self.process.threads()?.iter() {
394            // ok, this is crazy pants. is this 224 constant right?  Is this right for all versions of OSX? how is this determined?
395            // is this correct for all versions of python? Why does this even work?
396            let current_handle = thread.thread_handle()? - 224;
397            self.python_thread_ids.insert(current_handle, thread.id()?);
398        }
399
400        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
401            return Ok(Some(*thread_id));
402        }
403        Ok(None)
404    }
405
406    #[cfg(all(target_os = "linux", not(feature = "unwind")))]
407    fn _get_os_thread_id<I: InterpreterState>(
408        &mut self,
409        _python_thread_id: u64,
410        _interp_head: *const I::ThreadState,
411    ) -> Result<Option<Tid>, Error> {
412        Ok(None)
413    }
414
415    #[cfg(all(target_os = "linux", feature = "unwind"))]
416    fn _get_os_thread_id<I: InterpreterState>(
417        &mut self,
418        python_thread_id: u64,
419        interp_head: *const I::ThreadState,
420    ) -> Result<Option<Tid>, Error> {
421        // in nonblocking mode, we can't get the threadid reliably (method here requires reading the RBX
422        // register which requires a ptrace attach). fallback to heuristic thread activity here
423        if self.config.blocking == LockingStrategy::NonBlocking {
424            return Ok(None);
425        }
426
427        // likewise this doesn't yet work for profiling processes running inside docker containers from the host os
428        if self.dockerized {
429            return Ok(None);
430        }
431
432        // If we've already know this threadid, we're good
433        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
434            return Ok(Some(*thread_id));
435        }
436
437        // Get a list of all the python thread ids
438        let mut all_python_threads = HashSet::new();
439        let mut threads = interp_head;
440        while !threads.is_null() {
441            let thread = self
442                .process
443                .copy_pointer(threads)
444                .context("Failed to copy PyThreadState")?;
445            let current = thread.thread_id();
446            all_python_threads.insert(current);
447            threads = thread.next();
448        }
449
450        let processed_os_threads: HashSet<Tid> =
451            HashSet::from_iter(self.python_thread_ids.values().copied());
452
453        let unwinder = self.process.unwinder()?;
454
455        // Try getting the pthread_id from the native stack registers for threads we haven't looked up yet
456        for thread in self.process.threads()?.iter() {
457            let threadid = thread.id()?;
458            if processed_os_threads.contains(&threadid) {
459                continue;
460            }
461
462            match self._get_pthread_id(&unwinder, thread, &all_python_threads) {
463                Ok(pthread_id) => {
464                    if pthread_id != 0 {
465                        self.python_thread_ids.insert(pthread_id, threadid);
466                    }
467                }
468                Err(e) => {
469                    warn!("Failed to get get_pthread_id for {}: {}", threadid, e);
470                }
471            };
472        }
473
474        // we can't get the python threadid for the main thread from registers,
475        // so instead assign the main threadid (pid) to the missing python thread
476        if !processed_os_threads.contains(&self.pid) {
477            let mut unknown_python_threadids = HashSet::new();
478            for python_thread_id in all_python_threads.iter() {
479                if !self.python_thread_ids.contains_key(python_thread_id) {
480                    unknown_python_threadids.insert(*python_thread_id);
481                }
482            }
483
484            if unknown_python_threadids.len() == 1 {
485                let python_thread_id = *unknown_python_threadids.iter().next().unwrap();
486                self.python_thread_ids.insert(python_thread_id, self.pid);
487            } else {
488                warn!("failed to get python threadid for main thread!");
489            }
490        }
491
492        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
493            return Ok(Some(*thread_id));
494        }
495        info!("failed looking up python threadid for {}. known python_thread_ids {:?}. all_python_threads {:?}",
496            python_thread_id, self.python_thread_ids, all_python_threads);
497        Ok(None)
498    }
499
500    #[cfg(all(target_os = "linux", feature = "unwind"))]
501    pub fn _get_pthread_id(
502        &self,
503        unwinder: &remoteprocess::Unwinder,
504        thread: &remoteprocess::Thread,
505        threadids: &HashSet<u64>,
506    ) -> Result<u64, Error> {
507        let mut pthread_id = 0;
508
509        let mut cursor = unwinder.cursor(thread)?;
510        while let Some(_) = cursor.next() {
511            // the pthread_id is usually a register (rbx on x86-64, r5 on ARM) in the top-level
512            // frame of the thread, but on some configs can be 2nd level. Handle this by taking the
513            // top-most value that is one of the pthread_ids we're looking for
514            #[cfg(target_arch = "x86_64")]
515            let possible_threadid = cursor.bx();
516            #[cfg(target_arch = "arm")]
517            let possible_threadid = cursor.r5();
518            if let Ok(reg) = possible_threadid {
519                if reg != 0 && threadids.contains(&reg) {
520                    pthread_id = reg;
521                }
522            }
523        }
524
525        Ok(pthread_id)
526    }
527
528    #[cfg(target_os = "freebsd")]
529    fn _get_os_thread_id<I: InterpreterState>(
530        &mut self,
531        _python_thread_id: u64,
532        _interp_head: *const I::ThreadState,
533    ) -> Result<Option<Tid>, Error> {
534        Ok(None)
535    }
536
537    fn _get_python_thread_name(&mut self, python_thread_id: u64) -> Option<String> {
538        match self.python_thread_names.get(&python_thread_id) {
539            Some(thread_name) => Some(thread_name.clone()),
540            None => {
541                self.python_thread_names = thread_name_lookup(self).unwrap_or_default();
542                self.python_thread_names.get(&python_thread_id).cloned()
543            }
544        }
545    }
546
547    /// We want to display filenames without the boilerplate of the python installation
548    /// directory etc. This function looks only includes paths inside a python
549    /// package or subpackage, and not the path the package is installed at
550    fn shorten_filename(&mut self, filename: &str) -> Option<String> {
551        // if the user requested full filenames, skip shortening
552        if self.config.full_filenames {
553            return Some(filename.to_string());
554        }
555
556        // if we have figured out the short filename already, use it
557        if let Some(short) = self.short_filenames.get(filename) {
558            return short.clone();
559        }
560
561        // on linux the process could be running in docker, access the filename through procfs
562        #[cfg(target_os = "linux")]
563        let filename_storage;
564
565        #[cfg(target_os = "linux")]
566        let filename = if self.dockerized {
567            filename_storage = format!("/proc/{}/root{}", self.pid, filename);
568            if Path::new(&filename_storage).exists() {
569                &filename_storage
570            } else {
571                filename
572            }
573        } else {
574            filename
575        };
576
577        // only include paths that include an __init__.py
578        let mut path = Path::new(filename);
579        while let Some(parent) = path.parent() {
580            path = parent;
581            if !parent.join("__init__.py").exists() {
582                break;
583            }
584        }
585
586        // remove the parent prefix and convert to an optional string
587        let shortened = Path::new(filename)
588            .strip_prefix(path)
589            .ok()
590            .map(|p| p.to_string_lossy().to_string());
591
592        self.short_filenames
593            .insert(filename.to_owned(), shortened.clone());
594        shortened
595    }
596}