Skip to main content

py_spy/
python_spy.rs

1use std::collections::HashMap;
2#[cfg(all(target_os = "linux", feature = "unwind"))]
3use std::collections::HashSet;
4#[cfg(all(target_os = "linux", feature = "unwind"))]
5use std::iter::FromIterator;
6use std::path::Path;
7
8use anyhow::{Context, Error, Result};
9use remoteprocess::{Pid, Process, ProcessMemory, Tid};
10
11use crate::config::{Config, LockingStrategy};
12#[cfg(feature = "unwind")]
13use crate::native_stack_trace::NativeStack;
14use crate::python_bindings::{
15    v2_7_15, v3_10_0, v3_11_0, v3_12_0, v3_13_0, v3_14_0, v3_3_7, v3_5_5, v3_6_6, v3_7_0, v3_8_0,
16    v3_9_5,
17};
18use crate::python_data_access::format_variable;
19use crate::python_interpreters::{InterpreterState, ThreadState};
20use crate::python_process_info::{
21    get_interpreter_address, get_python_version, get_threadstate_address, PythonProcessInfo,
22};
23use crate::python_threading::thread_name_lookup;
24use crate::stack_trace::{get_gil_threadid, get_stack_trace, StackTrace};
25use crate::version::Version;
26
27/// Lets you retrieve stack traces of a running python program
28pub struct PythonSpy {
29    pub pid: Pid,
30    pub process: Process,
31    pub version: Version,
32    pub interpreter_address: usize,
33    pub threadstate_address: usize,
34    pub config: Config,
35    #[cfg(feature = "unwind")]
36    pub native: Option<NativeStack>,
37    pub short_filenames: HashMap<String, Option<String>>,
38    pub python_thread_ids: HashMap<u64, Tid>,
39    pub python_thread_names: HashMap<u64, String>,
40    #[cfg(target_os = "linux")]
41    pub dockerized: bool,
42}
43
44impl PythonSpy {
45    /// Constructs a new PythonSpy object.
46    pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
47        let process = remoteprocess::Process::new(pid)
48            .context("Failed to open process - check if it is running.")?;
49
50        // get basic process information (memory maps/symbols etc)
51        let python_info = PythonProcessInfo::new(&process)?;
52
53        // lock the process when loading up on freebsd (rather than locking
54        // on every memory read). Needs done after getting python process info
55        // because procmaps also tries to attach w/ ptrace on freebsd
56        #[cfg(target_os = "freebsd")]
57        let _lock = process.lock();
58
59        let version = get_python_version(&python_info, &process)?;
60        info!("python version {} detected", version);
61
62        let interpreter_address = get_interpreter_address(&python_info, &process, &version)?;
63        info!("Found interpreter at 0x{:016x}", interpreter_address);
64
65        // lets us figure out which thread has the GIL
66        let threadstate_address = get_threadstate_address(
67            interpreter_address,
68            &python_info,
69            &process,
70            &version,
71            config,
72        )?;
73
74        #[cfg(feature = "unwind")]
75        let native = if config.native {
76            Some(NativeStack::new(
77                pid,
78                python_info.python_binary,
79                python_info.libpython_binary,
80            )?)
81        } else {
82            None
83        };
84
85        Ok(PythonSpy {
86            pid,
87            process,
88            version,
89            interpreter_address,
90            threadstate_address,
91            #[cfg(feature = "unwind")]
92            native,
93            #[cfg(target_os = "linux")]
94            dockerized: python_info.dockerized,
95            config: config.clone(),
96            short_filenames: HashMap::new(),
97            python_thread_ids: HashMap::new(),
98            python_thread_names: HashMap::new(),
99        })
100    }
101
102    /// Creates a PythonSpy object, retrying up to max_retries times.
103    /// Mainly useful for the case where the process is just started and
104    /// symbols or the python interpreter might not be loaded yet.
105    pub fn retry_new(pid: Pid, config: &Config, max_retries: u64) -> Result<PythonSpy, Error> {
106        let mut retries = 0;
107        loop {
108            let err = match PythonSpy::new(pid, config) {
109                Ok(mut process) => {
110                    // verify that we can load a stack trace before returning success
111                    match process.get_stack_traces() {
112                        Ok(_) => return Ok(process),
113                        Err(err) => err,
114                    }
115                }
116                Err(err) => err,
117            };
118
119            // If we failed, retry a couple times before returning the last error
120            retries += 1;
121            if retries >= max_retries {
122                return Err(err);
123            }
124            info!("Failed to connect to process, retrying. Error: {}", err);
125            std::thread::sleep(std::time::Duration::from_millis(20));
126        }
127    }
128
129    /// Gets a StackTrace for each thread in the current process
130    pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, Error> {
131        match self.version {
132            // ABI for 2.3/2.4/2.5/2.6/2.7 is compatible for our purpose
133            Version {
134                major: 2,
135                minor: 3..=7,
136                ..
137            } => self._get_stack_traces::<v2_7_15::_is>(),
138            Version {
139                major: 3, minor: 3, ..
140            } => self._get_stack_traces::<v3_3_7::_is>(),
141            // ABI for 3.4 and 3.5 is the same for our purposes
142            Version {
143                major: 3, minor: 4, ..
144            } => self._get_stack_traces::<v3_5_5::_is>(),
145            Version {
146                major: 3, minor: 5, ..
147            } => self._get_stack_traces::<v3_5_5::_is>(),
148            Version {
149                major: 3, minor: 6, ..
150            } => self._get_stack_traces::<v3_6_6::_is>(),
151            Version {
152                major: 3, minor: 7, ..
153            } => self._get_stack_traces::<v3_7_0::_is>(),
154            Version {
155                major: 3, minor: 8, ..
156            } => self._get_stack_traces::<v3_8_0::_is>(),
157            Version {
158                major: 3, minor: 9, ..
159            } => self._get_stack_traces::<v3_9_5::_is>(),
160            Version {
161                major: 3,
162                minor: 10,
163                ..
164            } => self._get_stack_traces::<v3_10_0::_is>(),
165            Version {
166                major: 3,
167                minor: 11,
168                ..
169            } => self._get_stack_traces::<v3_11_0::_is>(),
170            Version {
171                major: 3,
172                minor: 12,
173                ..
174            } => self._get_stack_traces::<v3_12_0::_is>(),
175            Version {
176                major: 3,
177                minor: 13,
178                ..
179            } => self._get_stack_traces::<v3_13_0::_is>(),
180            Version {
181                major: 3,
182                minor: 14,
183                ..
184            } => self._get_stack_traces::<v3_14_0::_is>(),
185            _ => Err(format_err!(
186                "Unsupported version of Python: {}",
187                self.version
188            )),
189        }
190    }
191
192    // implementation of get_stack_traces, where we have a type for the InterpreterState
193    fn _get_stack_traces<I: InterpreterState>(&mut self) -> Result<Vec<StackTrace>, Error> {
194        // Query the OS to get if each thread in the process is running or not
195        let mut thread_activity = HashMap::new();
196        if self.config.gil_only {
197            // Don't need to collect thread activity if we're only getting the
198            // GIL thread: If we're holding the GIL we're by definition active.
199        } else {
200            for thread in self.process.threads()?.iter() {
201                let threadid: Tid = thread.id()?;
202                let Ok(active) = thread.active() else {
203                    // Do not fail all sampling if a single thread died between entering the loop
204                    // and reading its status.
205                    continue;
206                };
207                thread_activity.insert(threadid, active);
208            }
209        }
210
211        // Lock the process if appropriate. Note we have to lock AFTER getting the thread
212        // activity status from the OS (otherwise each thread would report being inactive always).
213        // This has the potential for race conditions (in that the thread activity could change
214        // between getting the status and locking the thread, but seems unavoidable right now
215        let _lock = if self.config.blocking == LockingStrategy::Lock {
216            Some(self.process.lock().context("Failed to suspend process")?)
217        } else {
218            None
219        };
220
221        // Find PyThreadState, and loop over all the python threads
222        let threadstate_ptr_ptr = I::threadstate_ptr_ptr(self.interpreter_address);
223        let threads_head = self
224            .process
225            .copy_pointer(threadstate_ptr_ptr)
226            .context("Failed to copy PyThreadState head pointer")?;
227
228        // get the threadid of the gil if appropriate
229        let gil_thread_id = get_gil_threadid::<I, Process>(self.threadstate_address, &self.process)
230            .context("failed to get gil_thread_id")?;
231
232        let mut traces = Vec::new();
233        let mut threads = threads_head;
234        while !threads.is_null() {
235            // Get the stack trace of the python thread
236            let thread = self
237                .process
238                .copy_pointer(threads)
239                .context("Failed to copy PyThreadState")?;
240            threads = thread.next();
241
242            let python_thread_id = thread.thread_id();
243            let owns_gil = python_thread_id == gil_thread_id;
244
245            if self.config.gil_only && !owns_gil {
246                continue;
247            }
248
249            let mut trace = get_stack_trace(
250                &thread,
251                &self.process,
252                self.config.dump_locals > 0,
253                self.config.lineno,
254            )
255            .with_context(|| {
256                format!(
257                    "Failed to call get_stack_trace for thread {}",
258                    python_thread_id
259                )
260            })?;
261
262            // Try getting the native thread id
263
264            // python 3.11+ has the native thread id directly on the PyThreadState object,
265            // for older versions of python, try using OS specific code to get the native
266            // thread id (doesn't work on freebsd, or on arm/i686 processors on linux)
267            if trace.os_thread_id.is_none() {
268                let mut os_thread_id =
269                    self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
270
271                // linux can see issues where pthread_ids get recycled for new OS threads,
272                // which totally breaks the caching we were doing here. Detect this and retry
273                if let Some(tid) = os_thread_id {
274                    if !thread_activity.is_empty() && !thread_activity.contains_key(&tid) {
275                        info!("clearing away thread id caches, thread {} has exited", tid);
276                        self.python_thread_ids.clear();
277                        self.python_thread_names.clear();
278                        os_thread_id =
279                            self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
280                    }
281                }
282
283                trace.os_thread_id = os_thread_id.map(|id| id as u64);
284            }
285
286            trace.thread_name = self._get_python_thread_name(python_thread_id);
287            trace.owns_gil = owns_gil;
288            trace.pid = self.process.pid;
289
290            // Figure out if the thread is sleeping from the OS if possible
291            trace.active = true;
292            if let Some(id) = trace.os_thread_id {
293                let id = id as Tid;
294                if let Some(active) = thread_activity.get(&id as _) {
295                    trace.active = *active;
296                }
297            }
298
299            // fallback to using a heuristic if we think the thread is still active
300            // Note that on linux the OS thread activity can only be gotten on x86_64
301            // processors and even then seems to be wrong occasionally in thinking 'select'
302            // calls are active (which seems related to the thread locking code,
303            // this problem doesn't seem to happen with the --nonblocking option)
304            // Note: this should be done before the native merging for correct results
305            if trace.active {
306                trace.active = !self._heuristic_is_thread_idle(&trace);
307            }
308
309            // Merge in the native stack frames if necessary
310            #[cfg(feature = "unwind")]
311            {
312                if self.config.native {
313                    if let Some(native) = self.native.as_mut() {
314                        let thread_id = trace
315                            .os_thread_id
316                            .ok_or_else(|| format_err!("failed to get os threadid"))?;
317                        let os_thread = remoteprocess::Thread::new(thread_id as Tid)?;
318                        trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
319                    }
320                }
321            }
322
323            for frame in &mut trace.frames {
324                frame.short_filename = self.shorten_filename(&frame.filename);
325                if let Some(locals) = frame.locals.as_mut() {
326                    let max_length = (128 * self.config.dump_locals) as isize;
327                    for local in locals {
328                        let repr = format_variable::<I, Process>(
329                            &self.process,
330                            &self.version,
331                            local.addr,
332                            max_length,
333                        );
334                        local.repr = Some(repr.unwrap_or_else(|_| "?".to_owned()));
335                    }
336                }
337            }
338
339            traces.push(trace);
340
341            // This seems to happen occasionally when scanning BSS addresses for valid interpreters
342            if traces.len() > 4096 {
343                return Err(format_err!("Max thread recursion depth reached"));
344            }
345
346            if self.config.gil_only {
347                // There's only one GIL thread and we've captured it, so we can
348                // stop now
349                break;
350            }
351        }
352        Ok(traces)
353    }
354
355    // heuristic fallback for determining if a thread is active, used
356    // when we don't have the ability to get the thread information from the OS
357    fn _heuristic_is_thread_idle(&self, trace: &StackTrace) -> bool {
358        let frames = &trace.frames;
359        if frames.is_empty() {
360            // we could have 0 python frames, but still be active running native
361            // code.
362            false
363        } else {
364            let frame = &frames[0];
365            (frame.name == "wait" && frame.filename.ends_with("threading.py"))
366                || (frame.name == "select" && frame.filename.ends_with("selectors.py"))
367                || (frame.name == "poll"
368                    && (frame.filename.ends_with("asyncore.py")
369                        || frame.filename.contains("zmq")
370                        || frame.filename.contains("gevent")
371                        || frame.filename.contains("tornado")))
372        }
373    }
374
375    #[cfg(windows)]
376    fn _get_os_thread_id<I: InterpreterState>(
377        &mut self,
378        python_thread_id: u64,
379        _interp_head: *const I::ThreadState,
380    ) -> Result<Option<Tid>, Error> {
381        Ok(Some(python_thread_id as Tid))
382    }
383
384    #[cfg(target_os = "macos")]
385    fn _get_os_thread_id<I: InterpreterState>(
386        &mut self,
387        python_thread_id: u64,
388        _interp_head: *const I::ThreadState,
389    ) -> Result<Option<Tid>, Error> {
390        // If we've already know this threadid, we're good
391        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
392            return Ok(Some(*thread_id));
393        }
394
395        for thread in self.process.threads()?.iter() {
396            // ok, this is crazy pants. is this 224 constant right?  Is this right for all versions of OSX? how is this determined?
397            // is this correct for all versions of python? Why does this even work?
398            let current_handle = thread.thread_handle()? - 224;
399            self.python_thread_ids.insert(current_handle, thread.id()?);
400        }
401
402        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
403            return Ok(Some(*thread_id));
404        }
405        Ok(None)
406    }
407
408    #[cfg(all(target_os = "linux", not(feature = "unwind")))]
409    fn _get_os_thread_id<I: InterpreterState>(
410        &mut self,
411        _python_thread_id: u64,
412        _interp_head: *const I::ThreadState,
413    ) -> Result<Option<Tid>, Error> {
414        Ok(None)
415    }
416
417    #[cfg(all(target_os = "linux", feature = "unwind"))]
418    fn _get_os_thread_id<I: InterpreterState>(
419        &mut self,
420        python_thread_id: u64,
421        interp_head: *const I::ThreadState,
422    ) -> Result<Option<Tid>, Error> {
423        // in nonblocking mode, we can't get the threadid reliably (method here requires reading the RBX
424        // register which requires a ptrace attach). fallback to heuristic thread activity here
425        if self.config.blocking == LockingStrategy::NonBlocking {
426            return Ok(None);
427        }
428
429        // likewise this doesn't yet work for profiling processes running inside docker containers from the host os
430        if self.dockerized {
431            return Ok(None);
432        }
433
434        // If we've already know this threadid, we're good
435        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
436            return Ok(Some(*thread_id));
437        }
438
439        // Get a list of all the python thread ids
440        let mut all_python_threads = HashSet::new();
441        let mut threads = interp_head;
442        while !threads.is_null() {
443            let thread = self
444                .process
445                .copy_pointer(threads)
446                .context("Failed to copy PyThreadState")?;
447            let current = thread.thread_id();
448            all_python_threads.insert(current);
449            threads = thread.next();
450        }
451
452        let processed_os_threads: HashSet<Tid> =
453            HashSet::from_iter(self.python_thread_ids.values().copied());
454
455        let unwinder = self.process.unwinder()?;
456
457        // Try getting the pthread_id from the native stack registers for threads we haven't looked up yet
458        for thread in self.process.threads()?.iter() {
459            let threadid = thread.id()?;
460            if processed_os_threads.contains(&threadid) {
461                continue;
462            }
463
464            match self._get_pthread_id(&unwinder, thread, &all_python_threads) {
465                Ok(pthread_id) => {
466                    if pthread_id != 0 {
467                        self.python_thread_ids.insert(pthread_id, threadid);
468                    }
469                }
470                Err(e) => {
471                    warn!("Failed to get get_pthread_id for {}: {}", threadid, e);
472                }
473            };
474        }
475
476        // we can't get the python threadid for the main thread from registers,
477        // so instead assign the main threadid (pid) to the missing python thread
478        if !processed_os_threads.contains(&self.pid) {
479            let mut unknown_python_threadids = HashSet::new();
480            for python_thread_id in all_python_threads.iter() {
481                if !self.python_thread_ids.contains_key(python_thread_id) {
482                    unknown_python_threadids.insert(*python_thread_id);
483                }
484            }
485
486            if unknown_python_threadids.len() == 1 {
487                let python_thread_id = *unknown_python_threadids.iter().next().unwrap();
488                self.python_thread_ids.insert(python_thread_id, self.pid);
489            } else {
490                warn!("failed to get python threadid for main thread!");
491            }
492        }
493
494        if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
495            return Ok(Some(*thread_id));
496        }
497        info!("failed looking up python threadid for {}. known python_thread_ids {:?}. all_python_threads {:?}",
498            python_thread_id, self.python_thread_ids, all_python_threads);
499        Ok(None)
500    }
501
502    #[cfg(all(target_os = "linux", feature = "unwind"))]
503    pub fn _get_pthread_id(
504        &self,
505        unwinder: &remoteprocess::Unwinder,
506        thread: &remoteprocess::Thread,
507        threadids: &HashSet<u64>,
508    ) -> Result<u64, Error> {
509        let mut pthread_id = 0;
510
511        let mut cursor = unwinder.cursor(thread)?;
512        while let Some(_) = cursor.next() {
513            // the pthread_id is usually a register (rbx on x86-64, r5 on ARM) in the top-level
514            // frame of the thread, but on some configs can be 2nd level. Handle this by taking the
515            // top-most value that is one of the pthread_ids we're looking for
516            #[cfg(target_arch = "x86_64")]
517            let possible_threadid = cursor.bx();
518            #[cfg(target_arch = "arm")]
519            let possible_threadid = cursor.r5();
520            #[cfg(target_arch = "aarch64")]
521            let possible_threadid = unsafe { cursor.register(19) };
522            if let Ok(reg) = possible_threadid {
523                if reg != 0 && threadids.contains(&reg) {
524                    pthread_id = reg;
525                }
526            }
527        }
528
529        Ok(pthread_id)
530    }
531
532    #[cfg(target_os = "freebsd")]
533    fn _get_os_thread_id<I: InterpreterState>(
534        &mut self,
535        _python_thread_id: u64,
536        _interp_head: *const I::ThreadState,
537    ) -> Result<Option<Tid>, Error> {
538        Ok(None)
539    }
540
541    fn _get_python_thread_name(&mut self, python_thread_id: u64) -> Option<String> {
542        match self.python_thread_names.get(&python_thread_id) {
543            Some(thread_name) => Some(thread_name.clone()),
544            None => {
545                self.python_thread_names = thread_name_lookup(self).unwrap_or_default();
546                self.python_thread_names.get(&python_thread_id).cloned()
547            }
548        }
549    }
550
551    /// We want to display filenames without the boilerplate of the python installation
552    /// directory etc. This function looks only includes paths inside a python
553    /// package or subpackage, and not the path the package is installed at
554    fn shorten_filename(&mut self, filename: &str) -> Option<String> {
555        // if the user requested full filenames, skip shortening
556        if self.config.full_filenames {
557            return Some(filename.to_string());
558        }
559
560        // if we have figured out the short filename already, use it
561        if let Some(short) = self.short_filenames.get(filename) {
562            return short.clone();
563        }
564
565        // on linux the process could be running in docker, access the filename through procfs
566        #[cfg(target_os = "linux")]
567        let filename_storage;
568
569        #[cfg(target_os = "linux")]
570        let filename = if self.dockerized {
571            filename_storage = format!("/proc/{}/root{}", self.pid, filename);
572            if Path::new(&filename_storage).exists() {
573                &filename_storage
574            } else {
575                filename
576            }
577        } else {
578            filename
579        };
580
581        // only include paths that include an __init__.py
582        let mut path = Path::new(filename);
583        while let Some(parent) = path.parent() {
584            path = parent;
585            if !parent.join("__init__.py").exists() {
586                break;
587            }
588        }
589
590        // remove the parent prefix and convert to an optional string
591        let shortened = Path::new(filename)
592            .strip_prefix(path)
593            .ok()
594            .map(|p| p.to_string_lossy().to_string());
595
596        self.short_filenames
597            .insert(filename.to_owned(), shortened.clone());
598        shortened
599    }
600}