1use std::collections::HashMap;
2#[cfg(all(target_os = "linux", feature = "unwind"))]
3use std::collections::HashSet;
4#[cfg(all(target_os = "linux", feature = "unwind"))]
5use std::iter::FromIterator;
6use std::path::Path;
7
8use anyhow::{Context, Error, Result};
9use remoteprocess::{Pid, Process, ProcessMemory, Tid};
10
11use crate::config::{Config, LockingStrategy};
12#[cfg(feature = "unwind")]
13use crate::native_stack_trace::NativeStack;
14use crate::python_bindings::{
15 v2_7_15, v3_10_0, v3_11_0, v3_12_0, v3_13_0, v3_3_7, v3_5_5, v3_6_6, v3_7_0, v3_8_0, v3_9_5,
16};
17use crate::python_data_access::format_variable;
18use crate::python_interpreters::{InterpreterState, ThreadState};
19use crate::python_process_info::{
20 get_interpreter_address, get_python_version, get_threadstate_address, PythonProcessInfo,
21};
22use crate::python_threading::thread_name_lookup;
23use crate::stack_trace::{get_gil_threadid, get_stack_trace, StackTrace};
24use crate::version::Version;
25
26pub struct PythonSpy {
28 pub pid: Pid,
29 pub process: Process,
30 pub version: Version,
31 pub interpreter_address: usize,
32 pub threadstate_address: usize,
33 pub config: Config,
34 #[cfg(feature = "unwind")]
35 pub native: Option<NativeStack>,
36 pub short_filenames: HashMap<String, Option<String>>,
37 pub python_thread_ids: HashMap<u64, Tid>,
38 pub python_thread_names: HashMap<u64, String>,
39 #[cfg(target_os = "linux")]
40 pub dockerized: bool,
41}
42
43impl PythonSpy {
44 pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
46 let process = remoteprocess::Process::new(pid)
47 .context("Failed to open process - check if it is running.")?;
48
49 let python_info = PythonProcessInfo::new(&process)?;
51
52 #[cfg(target_os = "freebsd")]
56 let _lock = process.lock();
57
58 let version = get_python_version(&python_info, &process)?;
59 info!("python version {} detected", version);
60
61 let interpreter_address = get_interpreter_address(&python_info, &process, &version)?;
62 info!("Found interpreter at 0x{:016x}", interpreter_address);
63
64 let threadstate_address = get_threadstate_address(
66 interpreter_address,
67 &python_info,
68 &process,
69 &version,
70 config,
71 )?;
72
73 #[cfg(feature = "unwind")]
74 let native = if config.native {
75 Some(NativeStack::new(
76 pid,
77 python_info.python_binary,
78 python_info.libpython_binary,
79 )?)
80 } else {
81 None
82 };
83
84 Ok(PythonSpy {
85 pid,
86 process,
87 version,
88 interpreter_address,
89 threadstate_address,
90 #[cfg(feature = "unwind")]
91 native,
92 #[cfg(target_os = "linux")]
93 dockerized: python_info.dockerized,
94 config: config.clone(),
95 short_filenames: HashMap::new(),
96 python_thread_ids: HashMap::new(),
97 python_thread_names: HashMap::new(),
98 })
99 }
100
101 pub fn retry_new(pid: Pid, config: &Config, max_retries: u64) -> Result<PythonSpy, Error> {
105 let mut retries = 0;
106 loop {
107 let err = match PythonSpy::new(pid, config) {
108 Ok(mut process) => {
109 match process.get_stack_traces() {
111 Ok(_) => return Ok(process),
112 Err(err) => err,
113 }
114 }
115 Err(err) => err,
116 };
117
118 retries += 1;
120 if retries >= max_retries {
121 return Err(err);
122 }
123 info!("Failed to connect to process, retrying. Error: {}", err);
124 std::thread::sleep(std::time::Duration::from_millis(20));
125 }
126 }
127
128 pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, Error> {
130 match self.version {
131 Version {
133 major: 2,
134 minor: 3..=7,
135 ..
136 } => self._get_stack_traces::<v2_7_15::_is>(),
137 Version {
138 major: 3, minor: 3, ..
139 } => self._get_stack_traces::<v3_3_7::_is>(),
140 Version {
142 major: 3, minor: 4, ..
143 } => self._get_stack_traces::<v3_5_5::_is>(),
144 Version {
145 major: 3, minor: 5, ..
146 } => self._get_stack_traces::<v3_5_5::_is>(),
147 Version {
148 major: 3, minor: 6, ..
149 } => self._get_stack_traces::<v3_6_6::_is>(),
150 Version {
151 major: 3, minor: 7, ..
152 } => self._get_stack_traces::<v3_7_0::_is>(),
153 Version {
155 major: 3,
156 minor: 8,
157 patch: 0,
158 ..
159 } => match self.version.release_flags.as_ref() {
160 "a1" | "a2" | "a3" => self._get_stack_traces::<v3_7_0::_is>(),
161 _ => self._get_stack_traces::<v3_8_0::_is>(),
162 },
163 Version {
164 major: 3, minor: 8, ..
165 } => self._get_stack_traces::<v3_8_0::_is>(),
166 Version {
167 major: 3, minor: 9, ..
168 } => self._get_stack_traces::<v3_9_5::_is>(),
169 Version {
170 major: 3,
171 minor: 10,
172 ..
173 } => self._get_stack_traces::<v3_10_0::_is>(),
174 Version {
175 major: 3,
176 minor: 11,
177 ..
178 } => self._get_stack_traces::<v3_11_0::_is>(),
179 Version {
180 major: 3,
181 minor: 12,
182 ..
183 } => self._get_stack_traces::<v3_12_0::_is>(),
184 Version {
185 major: 3,
186 minor: 13,
187 ..
188 } => self._get_stack_traces::<v3_13_0::_is>(),
189 _ => Err(format_err!(
190 "Unsupported version of Python: {}",
191 self.version
192 )),
193 }
194 }
195
196 fn _get_stack_traces<I: InterpreterState>(&mut self) -> Result<Vec<StackTrace>, Error> {
198 let mut thread_activity = HashMap::new();
200 if self.config.gil_only {
201 } else {
204 for thread in self.process.threads()?.iter() {
205 let threadid: Tid = thread.id()?;
206 let Ok(active) = thread.active() else {
207 continue;
210 };
211 thread_activity.insert(threadid, active);
212 }
213 }
214
215 let _lock = if self.config.blocking == LockingStrategy::Lock {
220 Some(self.process.lock().context("Failed to suspend process")?)
221 } else {
222 None
223 };
224
225 let threadstate_ptr_ptr = I::threadstate_ptr_ptr(self.interpreter_address);
227 let threads_head = self
228 .process
229 .copy_pointer(threadstate_ptr_ptr)
230 .context("Failed to copy PyThreadState head pointer")?;
231
232 let gil_thread_id = get_gil_threadid::<I, Process>(self.threadstate_address, &self.process)
234 .context("failed to get gil_thread_id")?;
235
236 let mut traces = Vec::new();
237 let mut threads = threads_head;
238 while !threads.is_null() {
239 let thread = self
241 .process
242 .copy_pointer(threads)
243 .context("Failed to copy PyThreadState")?;
244 threads = thread.next();
245
246 let python_thread_id = thread.thread_id();
247 let owns_gil = python_thread_id == gil_thread_id;
248
249 if self.config.gil_only && !owns_gil {
250 continue;
251 }
252
253 let mut trace = get_stack_trace(
254 &thread,
255 &self.process,
256 self.config.dump_locals > 0,
257 self.config.lineno,
258 )?;
259
260 if trace.os_thread_id.is_none() {
266 let mut os_thread_id =
267 self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
268
269 if let Some(tid) = os_thread_id {
272 if !thread_activity.is_empty() && !thread_activity.contains_key(&tid) {
273 info!("clearing away thread id caches, thread {} has exited", tid);
274 self.python_thread_ids.clear();
275 self.python_thread_names.clear();
276 os_thread_id =
277 self._get_os_thread_id::<I>(python_thread_id, threads_head)?;
278 }
279 }
280
281 trace.os_thread_id = os_thread_id.map(|id| id as u64);
282 }
283
284 trace.thread_name = self._get_python_thread_name(python_thread_id);
285 trace.owns_gil = owns_gil;
286 trace.pid = self.process.pid;
287
288 trace.active = true;
290 if let Some(id) = trace.os_thread_id {
291 let id = id as Tid;
292 if let Some(active) = thread_activity.get(&id as _) {
293 trace.active = *active;
294 }
295 }
296
297 if trace.active {
304 trace.active = !self._heuristic_is_thread_idle(&trace);
305 }
306
307 #[cfg(feature = "unwind")]
309 {
310 if self.config.native {
311 if let Some(native) = self.native.as_mut() {
312 let thread_id = trace
313 .os_thread_id
314 .ok_or_else(|| format_err!("failed to get os threadid"))?;
315 let os_thread = remoteprocess::Thread::new(thread_id as Tid)?;
316 trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
317 }
318 }
319 }
320
321 for frame in &mut trace.frames {
322 frame.short_filename = self.shorten_filename(&frame.filename);
323 if let Some(locals) = frame.locals.as_mut() {
324 let max_length = (128 * self.config.dump_locals) as isize;
325 for local in locals {
326 let repr = format_variable::<I, Process>(
327 &self.process,
328 &self.version,
329 local.addr,
330 max_length,
331 );
332 local.repr = Some(repr.unwrap_or_else(|_| "?".to_owned()));
333 }
334 }
335 }
336
337 traces.push(trace);
338
339 if traces.len() > 4096 {
341 return Err(format_err!("Max thread recursion depth reached"));
342 }
343
344 if self.config.gil_only {
345 break;
348 }
349 }
350 Ok(traces)
351 }
352
353 fn _heuristic_is_thread_idle(&self, trace: &StackTrace) -> bool {
356 let frames = &trace.frames;
357 if frames.is_empty() {
358 false
361 } else {
362 let frame = &frames[0];
363 (frame.name == "wait" && frame.filename.ends_with("threading.py"))
364 || (frame.name == "select" && frame.filename.ends_with("selectors.py"))
365 || (frame.name == "poll"
366 && (frame.filename.ends_with("asyncore.py")
367 || frame.filename.contains("zmq")
368 || frame.filename.contains("gevent")
369 || frame.filename.contains("tornado")))
370 }
371 }
372
373 #[cfg(windows)]
374 fn _get_os_thread_id<I: InterpreterState>(
375 &mut self,
376 python_thread_id: u64,
377 _interp_head: *const I::ThreadState,
378 ) -> Result<Option<Tid>, Error> {
379 Ok(Some(python_thread_id as Tid))
380 }
381
382 #[cfg(target_os = "macos")]
383 fn _get_os_thread_id<I: InterpreterState>(
384 &mut self,
385 python_thread_id: u64,
386 _interp_head: *const I::ThreadState,
387 ) -> Result<Option<Tid>, Error> {
388 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
390 return Ok(Some(*thread_id));
391 }
392
393 for thread in self.process.threads()?.iter() {
394 let current_handle = thread.thread_handle()? - 224;
397 self.python_thread_ids.insert(current_handle, thread.id()?);
398 }
399
400 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
401 return Ok(Some(*thread_id));
402 }
403 Ok(None)
404 }
405
406 #[cfg(all(target_os = "linux", not(feature = "unwind")))]
407 fn _get_os_thread_id<I: InterpreterState>(
408 &mut self,
409 _python_thread_id: u64,
410 _interp_head: *const I::ThreadState,
411 ) -> Result<Option<Tid>, Error> {
412 Ok(None)
413 }
414
415 #[cfg(all(target_os = "linux", feature = "unwind"))]
416 fn _get_os_thread_id<I: InterpreterState>(
417 &mut self,
418 python_thread_id: u64,
419 interp_head: *const I::ThreadState,
420 ) -> Result<Option<Tid>, Error> {
421 if self.config.blocking == LockingStrategy::NonBlocking {
424 return Ok(None);
425 }
426
427 if self.dockerized {
429 return Ok(None);
430 }
431
432 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
434 return Ok(Some(*thread_id));
435 }
436
437 let mut all_python_threads = HashSet::new();
439 let mut threads = interp_head;
440 while !threads.is_null() {
441 let thread = self
442 .process
443 .copy_pointer(threads)
444 .context("Failed to copy PyThreadState")?;
445 let current = thread.thread_id();
446 all_python_threads.insert(current);
447 threads = thread.next();
448 }
449
450 let processed_os_threads: HashSet<Tid> =
451 HashSet::from_iter(self.python_thread_ids.values().copied());
452
453 let unwinder = self.process.unwinder()?;
454
455 for thread in self.process.threads()?.iter() {
457 let threadid = thread.id()?;
458 if processed_os_threads.contains(&threadid) {
459 continue;
460 }
461
462 match self._get_pthread_id(&unwinder, thread, &all_python_threads) {
463 Ok(pthread_id) => {
464 if pthread_id != 0 {
465 self.python_thread_ids.insert(pthread_id, threadid);
466 }
467 }
468 Err(e) => {
469 warn!("Failed to get get_pthread_id for {}: {}", threadid, e);
470 }
471 };
472 }
473
474 if !processed_os_threads.contains(&self.pid) {
477 let mut unknown_python_threadids = HashSet::new();
478 for python_thread_id in all_python_threads.iter() {
479 if !self.python_thread_ids.contains_key(python_thread_id) {
480 unknown_python_threadids.insert(*python_thread_id);
481 }
482 }
483
484 if unknown_python_threadids.len() == 1 {
485 let python_thread_id = *unknown_python_threadids.iter().next().unwrap();
486 self.python_thread_ids.insert(python_thread_id, self.pid);
487 } else {
488 warn!("failed to get python threadid for main thread!");
489 }
490 }
491
492 if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
493 return Ok(Some(*thread_id));
494 }
495 info!("failed looking up python threadid for {}. known python_thread_ids {:?}. all_python_threads {:?}",
496 python_thread_id, self.python_thread_ids, all_python_threads);
497 Ok(None)
498 }
499
500 #[cfg(all(target_os = "linux", feature = "unwind"))]
501 pub fn _get_pthread_id(
502 &self,
503 unwinder: &remoteprocess::Unwinder,
504 thread: &remoteprocess::Thread,
505 threadids: &HashSet<u64>,
506 ) -> Result<u64, Error> {
507 let mut pthread_id = 0;
508
509 let mut cursor = unwinder.cursor(thread)?;
510 while let Some(_) = cursor.next() {
511 #[cfg(target_arch = "x86_64")]
515 let possible_threadid = cursor.bx();
516 #[cfg(target_arch = "arm")]
517 let possible_threadid = cursor.r5();
518 if let Ok(reg) = possible_threadid {
519 if reg != 0 && threadids.contains(®) {
520 pthread_id = reg;
521 }
522 }
523 }
524
525 Ok(pthread_id)
526 }
527
528 #[cfg(target_os = "freebsd")]
529 fn _get_os_thread_id<I: InterpreterState>(
530 &mut self,
531 _python_thread_id: u64,
532 _interp_head: *const I::ThreadState,
533 ) -> Result<Option<Tid>, Error> {
534 Ok(None)
535 }
536
537 fn _get_python_thread_name(&mut self, python_thread_id: u64) -> Option<String> {
538 match self.python_thread_names.get(&python_thread_id) {
539 Some(thread_name) => Some(thread_name.clone()),
540 None => {
541 self.python_thread_names = thread_name_lookup(self).unwrap_or_default();
542 self.python_thread_names.get(&python_thread_id).cloned()
543 }
544 }
545 }
546
547 fn shorten_filename(&mut self, filename: &str) -> Option<String> {
551 if self.config.full_filenames {
553 return Some(filename.to_string());
554 }
555
556 if let Some(short) = self.short_filenames.get(filename) {
558 return short.clone();
559 }
560
561 #[cfg(target_os = "linux")]
563 let filename_storage;
564
565 #[cfg(target_os = "linux")]
566 let filename = if self.dockerized {
567 filename_storage = format!("/proc/{}/root{}", self.pid, filename);
568 if Path::new(&filename_storage).exists() {
569 &filename_storage
570 } else {
571 filename
572 }
573 } else {
574 filename
575 };
576
577 let mut path = Path::new(filename);
579 while let Some(parent) = path.parent() {
580 path = parent;
581 if !parent.join("__init__.py").exists() {
582 break;
583 }
584 }
585
586 let shortened = Path::new(filename)
588 .strip_prefix(path)
589 .ok()
590 .map(|p| p.to_string_lossy().to_string());
591
592 self.short_filenames
593 .insert(filename.to_owned(), shortened.clone());
594 shortened
595 }
596}