1#![allow(clippy::type_complexity)]
2
3use std::collections::HashMap;
4use std::sync::mpsc::{self, Receiver, Sender};
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use anyhow::Error;
10
11use remoteprocess::Pid;
12
13use crate::config::Config;
14use crate::python_spy::PythonSpy;
15use crate::stack_trace::{ProcessInfo, StackTrace};
16use crate::timer::Timer;
17use crate::version::Version;
18
19pub struct Sampler {
20 pub version: Option<Version>,
21 rx: Option<Receiver<Sample>>,
22 sampling_thread: Option<thread::JoinHandle<()>>,
23}
24
25pub struct Sample {
26 pub traces: Vec<StackTrace>,
27 pub sampling_errors: Option<Vec<(Pid, Error)>>,
28 pub late: Option<Duration>,
29}
30
31impl Sampler {
32 pub fn new(pid: Pid, config: &Config) -> Result<Sampler, Error> {
33 if config.subprocesses {
34 Self::new_subprocess_sampler(pid, config)
35 } else {
36 Self::new_sampler(pid, config)
37 }
38 }
39
40 fn new_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
42 let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
43 let (initialized_tx, initialized_rx): (
44 Sender<Result<Version, Error>>,
45 Receiver<Result<Version, Error>>,
46 ) = mpsc::channel();
47 let config = config.clone();
48 let sampling_thread = thread::spawn(move || {
49 let mut spy = match PythonSpy::retry_new(pid, &config, 20) {
52 Ok(spy) => {
53 if initialized_tx.send(Ok(spy.version.clone())).is_err() {
54 return;
55 }
56 spy
57 }
58 Err(e) => {
59 initialized_tx.send(Err(e)).unwrap();
60 return;
61 }
62 };
63
64 for sleep in Timer::new(spy.config.sampling_rate as f64) {
65 let mut sampling_errors = None;
66 let traces = match spy.get_stack_traces() {
67 Ok(traces) => traces,
68 Err(e) => {
69 if spy.process.exe().is_err() {
70 info!(
71 "stopped sampling pid {} because the process exited",
72 spy.pid
73 );
74 break;
75 }
76 sampling_errors = Some(vec![(spy.pid, e)]);
77 Vec::new()
78 }
79 };
80
81 let late = sleep.err();
82 if tx
83 .send(Sample {
84 traces,
85 sampling_errors,
86 late,
87 })
88 .is_err()
89 {
90 break;
91 }
92 }
93 });
94
95 let version = initialized_rx.recv()??;
96 Ok(Sampler {
97 rx: Some(rx),
98 version: Some(version),
99 sampling_thread: Some(sampling_thread),
100 })
101 }
102
103 fn new_subprocess_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
106 let process = remoteprocess::Process::new(pid)?;
107
108 let mut spies = HashMap::new();
110 let mut retries = 10;
111 spies.insert(pid, PythonSpyThread::new(pid, None, config)?);
112
113 loop {
114 for (childpid, parentpid) in process.child_processes()? {
115 match PythonSpyThread::new(childpid, Some(parentpid), config) {
118 Ok(spy) => {
119 spies.insert(childpid, spy);
120 }
121 Err(e) => {
122 warn!("Failed to open process {}: {}", childpid, e);
123 }
124 }
125 }
126
127 if spies.values_mut().any(|spy| spy.wait_initialized()) {
130 break;
131 }
132
133 retries -= 1;
135 if retries == 0 {
136 return Err(format_err!(
137 "No python processes found in process {} or any of its subprocesses",
138 pid
139 ));
140 }
141 std::thread::sleep(std::time::Duration::from_millis(100));
142 }
143
144 let spies = Arc::new(Mutex::new(spies));
147 let monitor_spies = spies.clone();
148 let monitor_config = config.clone();
149 std::thread::spawn(move || {
150 while process.exe().is_ok() {
151 match monitor_spies.lock() {
152 Ok(mut spies) => {
153 for (childpid, parentpid) in process
154 .child_processes()
155 .expect("failed to get subprocesses")
156 {
157 if spies.contains_key(&childpid) {
158 continue;
159 }
160 match PythonSpyThread::new(childpid, Some(parentpid), &monitor_config) {
161 Ok(spy) => {
162 spies.insert(childpid, spy);
163 }
164 Err(e) => {
165 warn!("Failed to create spy for {}: {}", childpid, e);
166 }
167 }
168 }
169 }
170 Err(e) => {
171 error!("Failed to acquire lock: {}", e);
172 }
173 }
174 std::thread::sleep(Duration::from_millis(100));
175 }
176 });
177
178 let mut process_info = HashMap::new();
179
180 let config = config.clone();
182 let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
183 let sampling_thread = std::thread::spawn(move || {
184 for sleep in Timer::new(config.sampling_rate as f64) {
185 let mut traces = Vec::new();
186 let mut sampling_errors = None;
187
188 let mut spies = match spies.lock() {
189 Ok(current) => current,
190 Err(e) => {
191 error!("Failed to get process tree: {}", e);
192 continue;
193 }
194 };
195
196 for spy in spies.values_mut() {
198 if spy.initialized() {
199 spy.notify();
200 }
201 }
202
203 for spy in spies.values_mut() {
205 match spy.collect() {
206 Some(Ok(mut t)) => traces.append(&mut t),
207 Some(Err(e)) => {
208 let errors = sampling_errors.get_or_insert_with(Vec::new);
209 errors.push((spy.process.pid, e));
210 }
211 None => {}
212 }
213 }
214
215 for trace in traces.iter_mut() {
217 let pid = trace.pid;
218 let process = process_info
220 .entry(pid)
221 .or_insert_with(|| get_process_info(pid, &spies).map(|p| Arc::new(*p)));
222 trace.process_info = process.clone();
223 }
224
225 let late = sleep.err();
227 if tx
228 .send(Sample {
229 traces,
230 sampling_errors,
231 late,
232 })
233 .is_err()
234 {
235 break;
236 }
237
238 if spies.len() == 0 || spies.values().all(|x| !x.running) {
240 break;
241 }
242 }
243 });
244
245 Ok(Sampler {
246 rx: Some(rx),
247 version: None,
248 sampling_thread: Some(sampling_thread),
249 })
250 }
251}
252
253impl Iterator for Sampler {
254 type Item = Sample;
255 fn next(&mut self) -> Option<Self::Item> {
256 self.rx.as_ref().unwrap().recv().ok()
257 }
258}
259
260impl Drop for Sampler {
261 fn drop(&mut self) {
262 self.rx = None;
263 if let Some(t) = self.sampling_thread.take() {
264 t.join().unwrap();
265 }
266 }
267}
268
269struct PythonSpyThread {
270 initialized_rx: Receiver<Result<Version, Error>>,
271 notify_tx: Sender<()>,
272 sample_rx: Receiver<Result<Vec<StackTrace>, Error>>,
273 initialized: Option<Result<Version, Error>>,
274 pub running: bool,
275 notified: bool,
276 pub process: remoteprocess::Process,
277 pub parent: Option<Pid>,
278 pub command_line: String,
279}
280
281impl PythonSpyThread {
282 fn new(pid: Pid, parent: Option<Pid>, config: &Config) -> Result<PythonSpyThread, Error> {
283 let (initialized_tx, initialized_rx): (
284 Sender<Result<Version, Error>>,
285 Receiver<Result<Version, Error>>,
286 ) = mpsc::channel();
287 let (notify_tx, notify_rx): (Sender<()>, Receiver<()>) = mpsc::channel();
288 let (sample_tx, sample_rx): (
289 Sender<Result<Vec<StackTrace>, Error>>,
290 Receiver<Result<Vec<StackTrace>, Error>>,
291 ) = mpsc::channel();
292 let config = config.clone();
293 let process = remoteprocess::Process::new(pid)?;
294 let command_line = process
295 .cmdline()
296 .map(|x| x.join(" "))
297 .unwrap_or_else(|_| "".to_owned());
298
299 thread::spawn(move || {
300 let mut spy = match PythonSpy::retry_new(pid, &config, 5) {
303 Ok(spy) => {
304 if initialized_tx.send(Ok(spy.version.clone())).is_err() {
305 return;
306 }
307 spy
308 }
309 Err(e) => {
310 warn!("Failed to profile python from process {}: {}", pid, e);
311 initialized_tx.send(Err(e)).unwrap();
312 return;
313 }
314 };
315
316 for _ in notify_rx.iter() {
317 let result = spy.get_stack_traces();
318 if result.is_err() && spy.process.exe().is_err() {
319 info!(
320 "stopped sampling pid {} because the process exited",
321 spy.pid
322 );
323 break;
324 }
325 if sample_tx.send(result).is_err() {
326 break;
327 }
328 }
329 });
330 Ok(PythonSpyThread {
331 initialized_rx,
332 notify_tx,
333 sample_rx,
334 process,
335 command_line,
336 parent,
337 initialized: None,
338 running: false,
339 notified: false,
340 })
341 }
342
343 fn wait_initialized(&mut self) -> bool {
344 match self.initialized_rx.recv() {
345 Ok(status) => {
346 self.running = status.is_ok();
347 self.initialized = Some(status);
348 self.running
349 }
350 Err(e) => {
351 warn!(
353 "Failed to get initialization status from PythonSpyThread: {}",
354 e
355 );
356 false
357 }
358 }
359 }
360
361 fn initialized(&mut self) -> bool {
362 if let Some(init) = self.initialized.as_ref() {
363 return init.is_ok();
364 }
365 match self.initialized_rx.try_recv() {
366 Ok(status) => {
367 self.running = status.is_ok();
368 self.initialized = Some(status);
369 self.running
370 }
371 Err(std::sync::mpsc::TryRecvError::Empty) => false,
372 Err(std::sync::mpsc::TryRecvError::Disconnected) => {
373 warn!("Failed to get initialization status from PythonSpyThread: disconnected");
375 false
376 }
377 }
378 }
379
380 fn notify(&mut self) {
381 match self.notify_tx.send(()) {
382 Ok(_) => {
383 self.notified = true;
384 }
385 Err(_) => {
386 self.running = false;
387 }
388 }
389 }
390
391 fn collect(&mut self) -> Option<Result<Vec<StackTrace>, Error>> {
392 if !self.notified {
393 return None;
394 }
395 self.notified = false;
396 match self.sample_rx.recv() {
397 Ok(sample) => Some(sample),
398 Err(_) => {
399 self.running = false;
400 None
401 }
402 }
403 }
404}
405
406fn get_process_info(pid: Pid, spies: &HashMap<Pid, PythonSpyThread>) -> Option<Box<ProcessInfo>> {
407 spies.get(&pid).map(|spy| {
408 let parent = spy
409 .parent
410 .and_then(|parentpid| get_process_info(parentpid, spies));
411 Box::new(ProcessInfo {
412 pid,
413 parent,
414 command_line: spy.command_line.clone(),
415 })
416 })
417}