1use std::collections::HashMap;
2#[cfg(all(target_os = "linux", feature = "unwind"))]
3use std::collections::HashSet;
4#[cfg(all(target_os = "linux", feature = "unwind"))]
5use std::iter::FromIterator;
6use std::path::Path;
7
8use anyhow::{Context, Error, Result};
9use remoteprocess::{Pid, Process, ProcessMemory, Tid};
10
11use crate::config::{Config, LockingStrategy};
12#[cfg(feature = "unwind")]
13use crate::native_stack_trace::NativeStack;
14use crate::python_bindings::{
15 v2_7_15, v3_10_0, v3_11_0, v3_12_0, v3_13_0, v3_14_0, v3_3_7, v3_5_5, v3_6_6, v3_7_0, v3_8_0,
16 v3_9_5,
17};
18use crate::python_data_access::format_variable;
19use crate::python_interpreters::{InterpreterState, ThreadState};
20use crate::python_process_info::{
21 get_interpreter_address, get_python_version, get_threadstate_address, PythonProcessInfo,
22};
23use crate::python_threading::thread_name_lookup;
24use crate::stack_trace::{get_gil_threadid, get_stack_trace, StackTrace};
25use crate::version::Version;
26
27pub struct PythonSpy {
29 pub pid: Pid,
30 pub process: Process,
31 pub version: Version,
32 pub interpreter_address: usize,
33 pub threadstate_address: usize,
34 pub config: Config,
35 #[cfg(feature = "unwind")]
36 pub native: Option<NativeStack>,
37 pub short_filenames: HashMap<String, Option<String>>,
38 pub python_thread_ids: HashMap<u64, Tid>,
39 pub python_thread_names: HashMap<u64, String>,
40 #[cfg(target_os = "linux")]
41 pub dockerized: bool,
42}
43
44impl PythonSpy {
45 pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
47 let process = remoteprocess::Process::new(pid)
48 .context("Failed to open process - check if it is running.")?;
49
50 let python_info = PythonProcessInfo::new(&process)?;
52
53 #[cfg(target_os = "freebsd")]
57 let _lock = process.lock();
58
59 let version = get_python_version(&python_info, &process)?;
60 info!("python version {} detected", version);
61
62 let interpreter_address = get_interpreter_address(&python_info, &process, &version)?;
63 info!("Found interpreter at 0x{:016x}", interpreter_address);
64
65 let threadstate_address = get_threadstate_address(
67 interpreter_address,
68 &python_info,
69 &process,
70 &version,
71 config,
72 )?;
73
74 #[cfg(feature = "unwind")]
75 let native = if config.native {
76 Some(NativeStack::new(
77 pid,
78 python_info.python_binary,
79 python_info.libpython_binary,
80 )?)
81 } else {
82 None
83 };
84
85 Ok(PythonSpy {
86 pid,
87 process,
88 version,
89 interpreter_address,
90 threadstate_address,
91 #[cfg(feature = "unwind")]
92 native,
93 #[cfg(target_os = "linux")]
94 dockerized: python_info.dockerized,
95 config: config.clone(),
96 short_filenames: HashMap::new(),
97 python_thread_ids: HashMap::new(),
98 python_thread_names: HashMap::new(),
99 })
100 }
101
102 pub fn retry_new(pid: Pid, config: &Config, max_retries: u64) -> Result<PythonSpy, Error> {
106 let mut retries = 0;
107 loop {
108 let err = match PythonSpy::new(pid, config) {
109 Ok(mut process) => {
110 match process.get_stack_traces() {
112 Ok(_) => return Ok(process),
113 Err(err) => err,
114 }
115 }
116 Err(err) => err,
117 };
118
119 retries += 1;
121 if retries >= max_retries {
122 return Err(err);
123 }
124 info!("Failed to connect to process, retrying. Error: {}", err);
125 std::thread::sleep(std::time::Duration::from_millis(20));
126 }
127 }
128
129 pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, Error> {
131 match self.version {
132 Version {
134 major: 2,
135 minor: 3..=7,
136 ..
137 } => self._get_stack_traces::<v2_7_15::_is>(),
138 Version {
139 major: 3, minor: 3, ..
140 } => self._get_stack_traces::<v3_3_7::_is>(),
141 Version {
143 major: 3, minor: 4, ..
144 } => self._get_stack_traces::<v3_5_5::_is>(),
145 Version {
146 major: 3, minor: 5, ..
147 } => self._get_stack_traces::<v3_5_5::_is>(),
148 Version {
149 major: 3, minor: 6, ..
150 } => self._get_stack_traces::<v3_6_6::_is>(),
151 Version {
152 major: 3, minor: 7, ..
153 } => self._get_stack_traces::<v3_7_0::_is>(),
154 Version {
155 major: 3, minor: 8, ..
156 } => self._get_stack_traces::<v3_8_0::_is>(),
157 Version {
158 major: 3, minor: 9, ..
159 } => self._get_stack_traces::<v3_9_5::_is>(),
160 Version {
161 major: 3,
162 minor: 10,
163 ..
164 } => self._get_stack_traces::<v3_10_0::_is>(),
165 Version {
166 major: 3,
167 minor: 11,
168 ..
169 } => self._get_stack_traces::<v3_11_0::_is>(),
170 Version {
171 major: 3,
172 minor: 12,
173 ..
174 } => self._get_stack_traces::<v3_12_0::_is>(),
175 Version {
176 major: 3,
177 minor: 13,
178 ..
179 } => self._get_stack_traces::<v3_13_0::_is>(),
180 Version {
181 major: 3,
182 minor: 14,
183 ..
184 } => self._get_stack_traces::<v3_14_0::_is>(),
185 _ => Err(format_err!(
186 "Unsupported version of Python: {}",
187 self.version
188 )),
189 }
190 }
191
192 fn _get_stack_traces<I: InterpreterState>(&mut self) -> Result<Vec<StackTrace>, Error> {
194 let mut thread_activity = HashMap::new();
196 if self.config.gil_only {
197 } else {
200 for thread in self.process.threads()?.iter() {
201 let threadid: Tid = thread.id()?;
202 let Ok(active) = thread.active() else {
203 continue;
206 };
207 thread_activity.insert(threadid, active);
208 }
209 }
210
211 let _lock = if self.config.blocking == LockingStrategy::Lock {
216 Some(self.process.lock().context("Failed to suspend process")?)
217 } else {
218 None
219 };
220
221 let threadstate_ptr_ptr = I::threadstate_ptr_ptr(self.interpreter_address);
223 let threads_head = self
224 .process
225 .copy_pointer(threadstate_ptr_ptr)
226 .context("Failed to copy PyThreadState head pointer")?;
227
228 let gil_thread_id = get_gil_threadid::<I, Process>(self.threadstate_address, &self.process)
230 .context("failed to get gil_thread_id")?;
231
232 let mut traces = Vec::new();
233 let mut threads = threads_head;
234 while !threads.is_null() {
235 let thread = self
237 .process
238 .copy_pointer(threads)
239 .context("Failed to copy PyThreadState")?;
240 threads = thread.next();
241
242 let python_thread_id = thread.thread_id();
243 let owns_gil = python_thread_id == gil_thread_id;
244
245 if self.config.gil_only && !owns_gil {
246 continue;
247 }
248
249 let mut trace = get_stack_trace(
250 &thread,
251 &self.process,
252 self.config.dump_locals > 0,
253 self.config.lineno,
254 )
255 .with_context(|| {
256 format!(
257 "Failed to call get_stack_trace for thread {}",
258 python_thread_id
259 )
260 })?;
261
262 if trace.os_thread_id.is_none() {
268 let mut os_thread_id =
269 self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
270
271 if let Some(tid) = os_thread_id {
274 if !thread_activity.is_empty() && !thread_activity.contains_key(&tid) {
275 info!("clearing away thread id caches, thread {} has exited", tid);
276 self.python_thread_ids.clear();
277 self.python_thread_names.clear();
278 os_thread_id =
279 self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
280 }
281 }
282
283 trace.os_thread_id = os_thread_id.map(|id| id as u64);
284 }
285
286 trace.thread_name = self._get_python_thread_name(python_thread_id);
287 trace.owns_gil = owns_gil;
288 trace.pid = self.process.pid;
289
290 trace.active = true;
292 if let Some(id) = trace.os_thread_id {
293 let id = id as Tid;
294 if let Some(active) = thread_activity.get(&id as _) {
295 trace.active = *active;
296 }
297 }
298
299 if trace.active {
306 trace.active = !self._heuristic_is_thread_idle(&trace);
307 }
308
309 #[cfg(feature = "unwind")]
311 {
312 if self.config.native {
313 if let Some(native) = self.native.as_mut() {
314 let thread_id = trace
315 .os_thread_id
316 .ok_or_else(|| format_err!("failed to get os threadid"))?;
317 let os_thread = remoteprocess::Thread::new(thread_id as Tid)?;
318 trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
319 }
320 }
321 }
322
323 for frame in &mut trace.frames {
324 frame.short_filename = self.shorten_filename(&frame.filename);
325 if let Some(locals) = frame.locals.as_mut() {
326 let max_length = (128 * self.config.dump_locals) as isize;
327 for local in locals {
328 let repr = format_variable::<I, Process>(
329 &self.process,
330 &self.version,
331 local.addr,
332 max_length,
333 );
334 local.repr = Some(repr.unwrap_or_else(|_| "?".to_owned()));
335 }
336 }
337 }
338
339 traces.push(trace);
340
341 if traces.len() > 4096 {
343 return Err(format_err!("Max thread recursion depth reached"));
344 }
345
346 if self.config.gil_only {
347 break;
350 }
351 }
352 Ok(traces)
353 }
354
355 fn _heuristic_is_thread_idle(&self, trace: &StackTrace) -> bool {
358 let frames = &trace.frames;
359 if frames.is_empty() {
360 false
363 } else {
364 let frame = &frames[0];
365 (frame.name == "wait" && frame.filename.ends_with("threading.py"))
366 || (frame.name == "select" && frame.filename.ends_with("selectors.py"))
367 || (frame.name == "poll"
368 && (frame.filename.ends_with("asyncore.py")
369 || frame.filename.contains("zmq")
370 || frame.filename.contains("gevent")
371 || frame.filename.contains("tornado")))
372 }
373 }
374
375 #[cfg(windows)]
376 fn _get_os_thread_id<I: InterpreterState>(
377 &mut self,
378 python_thread_id: u64,
379 _interp_head: *const I::ThreadState,
380 ) -> Result<Option<Tid>, Error> {
381 Ok(Some(python_thread_id as Tid))
382 }
383
384 #[cfg(target_os = "macos")]
385 fn _get_os_thread_id<I: InterpreterState>(
386 &mut self,
387 python_thread_id: u64,
388 _interp_head: *const I::ThreadState,
389 ) -> Result<Option<Tid>, Error> {
390 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
392 return Ok(Some(*thread_id));
393 }
394
395 for thread in self.process.threads()?.iter() {
396 let current_handle = thread.thread_handle()? - 224;
399 self.python_thread_ids.insert(current_handle, thread.id()?);
400 }
401
402 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
403 return Ok(Some(*thread_id));
404 }
405 Ok(None)
406 }
407
408 #[cfg(all(target_os = "linux", not(feature = "unwind")))]
409 fn _get_os_thread_id<I: InterpreterState>(
410 &mut self,
411 _python_thread_id: u64,
412 _interp_head: *const I::ThreadState,
413 ) -> Result<Option<Tid>, Error> {
414 Ok(None)
415 }
416
417 #[cfg(all(target_os = "linux", feature = "unwind"))]
418 fn _get_os_thread_id<I: InterpreterState>(
419 &mut self,
420 python_thread_id: u64,
421 interp_head: *const I::ThreadState,
422 ) -> Result<Option<Tid>, Error> {
423 if self.config.blocking == LockingStrategy::NonBlocking {
426 return Ok(None);
427 }
428
429 if self.dockerized {
431 return Ok(None);
432 }
433
434 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
436 return Ok(Some(*thread_id));
437 }
438
439 let mut all_python_threads = HashSet::new();
441 let mut threads = interp_head;
442 while !threads.is_null() {
443 let thread = self
444 .process
445 .copy_pointer(threads)
446 .context("Failed to copy PyThreadState")?;
447 let current = thread.thread_id();
448 all_python_threads.insert(current);
449 threads = thread.next();
450 }
451
452 let processed_os_threads: HashSet<Tid> =
453 HashSet::from_iter(self.python_thread_ids.values().copied());
454
455 let unwinder = self.process.unwinder()?;
456
457 for thread in self.process.threads()?.iter() {
459 let threadid = thread.id()?;
460 if processed_os_threads.contains(&threadid) {
461 continue;
462 }
463
464 match self._get_pthread_id(&unwinder, thread, &all_python_threads) {
465 Ok(pthread_id) => {
466 if pthread_id != 0 {
467 self.python_thread_ids.insert(pthread_id, threadid);
468 }
469 }
470 Err(e) => {
471 warn!("Failed to get get_pthread_id for {}: {}", threadid, e);
472 }
473 };
474 }
475
476 if !processed_os_threads.contains(&self.pid) {
479 let mut unknown_python_threadids = HashSet::new();
480 for python_thread_id in all_python_threads.iter() {
481 if !self.python_thread_ids.contains_key(python_thread_id) {
482 unknown_python_threadids.insert(*python_thread_id);
483 }
484 }
485
486 if unknown_python_threadids.len() == 1 {
487 let python_thread_id = *unknown_python_threadids.iter().next().unwrap();
488 self.python_thread_ids.insert(python_thread_id, self.pid);
489 } else {
490 warn!("failed to get python threadid for main thread!");
491 }
492 }
493
494 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
495 return Ok(Some(*thread_id));
496 }
497 info!("failed looking up python threadid for {}. known python_thread_ids {:?}. all_python_threads {:?}",
498 python_thread_id, self.python_thread_ids, all_python_threads);
499 Ok(None)
500 }
501
502 #[cfg(all(target_os = "linux", feature = "unwind"))]
503 pub fn _get_pthread_id(
504 &self,
505 unwinder: &remoteprocess::Unwinder,
506 thread: &remoteprocess::Thread,
507 threadids: &HashSet<u64>,
508 ) -> Result<u64, Error> {
509 let mut pthread_id = 0;
510
511 let mut cursor = unwinder.cursor(thread)?;
512 while let Some(_) = cursor.next() {
513 #[cfg(target_arch = "x86_64")]
517 let possible_threadid = cursor.bx();
518 #[cfg(target_arch = "arm")]
519 let possible_threadid = cursor.r5();
520 #[cfg(target_arch = "aarch64")]
521 let possible_threadid = unsafe { cursor.register(19) };
522 if let Ok(reg) = possible_threadid {
523 if reg != 0 && threadids.contains(®) {
524 pthread_id = reg;
525 }
526 }
527 }
528
529 Ok(pthread_id)
530 }
531
532 #[cfg(target_os = "freebsd")]
533 fn _get_os_thread_id<I: InterpreterState>(
534 &mut self,
535 _python_thread_id: u64,
536 _interp_head: *const I::ThreadState,
537 ) -> Result<Option<Tid>, Error> {
538 Ok(None)
539 }
540
541 fn _get_python_thread_name(&mut self, python_thread_id: u64) -> Option<String> {
542 match self.python_thread_names.get(&python_thread_id) {
543 Some(thread_name) => Some(thread_name.clone()),
544 None => {
545 self.python_thread_names = thread_name_lookup(self).unwrap_or_default();
546 self.python_thread_names.get(&python_thread_id).cloned()
547 }
548 }
549 }
550
551 fn shorten_filename(&mut self, filename: &str) -> Option<String> {
555 if self.config.full_filenames {
557 return Some(filename.to_string());
558 }
559
560 if let Some(short) = self.short_filenames.get(filename) {
562 return short.clone();
563 }
564
565 #[cfg(target_os = "linux")]
567 let filename_storage;
568
569 #[cfg(target_os = "linux")]
570 let filename = if self.dockerized {
571 filename_storage = format!("/proc/{}/root{}", self.pid, filename);
572 if Path::new(&filename_storage).exists() {
573 &filename_storage
574 } else {
575 filename
576 }
577 } else {
578 filename
579 };
580
581 let mut path = Path::new(filename);
583 while let Some(parent) = path.parent() {
584 path = parent;
585 if !parent.join("__init__.py").exists() {
586 break;
587 }
588 }
589
590 let shortened = Path::new(filename)
592 .strip_prefix(path)
593 .ok()
594 .map(|p| p.to_string_lossy().to_string());
595
596 self.short_filenames
597 .insert(filename.to_owned(), shortened.clone());
598 shortened
599 }
600}