py_spy/
sampler.rs

1#![allow(clippy::type_complexity)]
2
3use std::collections::HashMap;
4use std::sync::mpsc::{self, Receiver, Sender};
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use anyhow::Error;
10
11use remoteprocess::Pid;
12
13use crate::config::Config;
14use crate::python_spy::PythonSpy;
15use crate::stack_trace::{ProcessInfo, StackTrace};
16use crate::timer::Timer;
17use crate::version::Version;
18
19pub struct Sampler {
20    pub version: Option<Version>,
21    rx: Option<Receiver<Sample>>,
22    sampling_thread: Option<thread::JoinHandle<()>>,
23}
24
25pub struct Sample {
26    pub traces: Vec<StackTrace>,
27    pub sampling_errors: Option<Vec<(Pid, Error)>>,
28    pub late: Option<Duration>,
29}
30
31impl Sampler {
32    pub fn new(pid: Pid, config: &Config) -> Result<Sampler, Error> {
33        if config.subprocesses {
34            Self::new_subprocess_sampler(pid, config)
35        } else {
36            Self::new_sampler(pid, config)
37        }
38    }
39
40    /// Creates a new sampler object, reading from a single process only
41    fn new_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
42        let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
43        let (initialized_tx, initialized_rx): (
44            Sender<Result<Version, Error>>,
45            Receiver<Result<Version, Error>>,
46        ) = mpsc::channel();
47        let config = config.clone();
48        let sampling_thread = thread::spawn(move || {
49            // We need to create this object inside the thread here since PythonSpy objects don't
50            // have the Send trait implemented on linux
51            let mut spy = match PythonSpy::retry_new(pid, &config, 20) {
52                Ok(spy) => {
53                    if initialized_tx.send(Ok(spy.version.clone())).is_err() {
54                        return;
55                    }
56                    spy
57                }
58                Err(e) => {
59                    initialized_tx.send(Err(e)).unwrap();
60                    return;
61                }
62            };
63
64            for sleep in Timer::new(spy.config.sampling_rate as f64) {
65                let mut sampling_errors = None;
66                let traces = match spy.get_stack_traces() {
67                    Ok(traces) => traces,
68                    Err(e) => {
69                        if spy.process.exe().is_err() {
70                            info!(
71                                "stopped sampling pid {} because the process exited",
72                                spy.pid
73                            );
74                            break;
75                        }
76                        sampling_errors = Some(vec![(spy.pid, e)]);
77                        Vec::new()
78                    }
79                };
80
81                let late = sleep.err();
82                if tx
83                    .send(Sample {
84                        traces,
85                        sampling_errors,
86                        late,
87                    })
88                    .is_err()
89                {
90                    break;
91                }
92            }
93        });
94
95        let version = initialized_rx.recv()??;
96        Ok(Sampler {
97            rx: Some(rx),
98            version: Some(version),
99            sampling_thread: Some(sampling_thread),
100        })
101    }
102
103    /// Creates a new sampler object that samples any python process in the
104    /// process or child processes
105    fn new_subprocess_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
106        let process = remoteprocess::Process::new(pid)?;
107
108        // Initialize a PythonSpy object per child, and build up the process tree
109        let mut spies = HashMap::new();
110        let mut retries = 10;
111        spies.insert(pid, PythonSpyThread::new(pid, None, config)?);
112
113        loop {
114            for (childpid, parentpid) in process.child_processes()? {
115                // If we can't create the child process, don't worry about it
116                // can happen with zombie child processes etc
117                match PythonSpyThread::new(childpid, Some(parentpid), config) {
118                    Ok(spy) => {
119                        spies.insert(childpid, spy);
120                    }
121                    Err(e) => {
122                        warn!("Failed to open process {}: {}", childpid, e);
123                    }
124                }
125            }
126
127            // wait for all the various python spy objects to initialize, and break out of here
128            // if we have one of them started.
129            if spies.values_mut().any(|spy| spy.wait_initialized()) {
130                break;
131            }
132
133            // Otherwise sleep for a short time and retry
134            retries -= 1;
135            if retries == 0 {
136                return Err(format_err!(
137                    "No python processes found in process {} or any of its subprocesses",
138                    pid
139                ));
140            }
141            std::thread::sleep(std::time::Duration::from_millis(100));
142        }
143
144        // Create a new thread to periodically monitor for new child processes, and update
145        // the procesess map
146        let spies = Arc::new(Mutex::new(spies));
147        let monitor_spies = spies.clone();
148        let monitor_config = config.clone();
149        std::thread::spawn(move || {
150            while process.exe().is_ok() {
151                match monitor_spies.lock() {
152                    Ok(mut spies) => {
153                        for (childpid, parentpid) in process
154                            .child_processes()
155                            .expect("failed to get subprocesses")
156                        {
157                            if spies.contains_key(&childpid) {
158                                continue;
159                            }
160                            match PythonSpyThread::new(childpid, Some(parentpid), &monitor_config) {
161                                Ok(spy) => {
162                                    spies.insert(childpid, spy);
163                                }
164                                Err(e) => {
165                                    warn!("Failed to create spy for {}: {}", childpid, e);
166                                }
167                            }
168                        }
169                    }
170                    Err(e) => {
171                        error!("Failed to acquire lock: {}", e);
172                    }
173                }
174                std::thread::sleep(Duration::from_millis(100));
175            }
176        });
177
178        let mut process_info = HashMap::new();
179
180        // Create a new thread to generate samples
181        let config = config.clone();
182        let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
183        let sampling_thread = std::thread::spawn(move || {
184            for sleep in Timer::new(config.sampling_rate as f64) {
185                let mut traces = Vec::new();
186                let mut sampling_errors = None;
187
188                let mut spies = match spies.lock() {
189                    Ok(current) => current,
190                    Err(e) => {
191                        error!("Failed to get process tree: {}", e);
192                        continue;
193                    }
194                };
195
196                // Notify all the initialized spies to generate a trace
197                for spy in spies.values_mut() {
198                    if spy.initialized() {
199                        spy.notify();
200                    }
201                }
202
203                // collect the traces from each python spy if possible
204                for spy in spies.values_mut() {
205                    match spy.collect() {
206                        Some(Ok(mut t)) => traces.append(&mut t),
207                        Some(Err(e)) => {
208                            let errors = sampling_errors.get_or_insert_with(Vec::new);
209                            errors.push((spy.process.pid, e));
210                        }
211                        None => {}
212                    }
213                }
214
215                // Annotate each trace with the process info
216                for trace in traces.iter_mut() {
217                    let pid = trace.pid;
218                    // Annotate each trace with the process info for the current
219                    let process = process_info
220                        .entry(pid)
221                        .or_insert_with(|| get_process_info(pid, &spies).map(|p| Arc::new(*p)));
222                    trace.process_info = process.clone();
223                }
224
225                // Send the collected info back
226                let late = sleep.err();
227                if tx
228                    .send(Sample {
229                        traces,
230                        sampling_errors,
231                        late,
232                    })
233                    .is_err()
234                {
235                    break;
236                }
237
238                // If all of our spies have stopped, we're done
239                if spies.len() == 0 || spies.values().all(|x| !x.running) {
240                    break;
241                }
242            }
243        });
244
245        Ok(Sampler {
246            rx: Some(rx),
247            version: None,
248            sampling_thread: Some(sampling_thread),
249        })
250    }
251}
252
253impl Iterator for Sampler {
254    type Item = Sample;
255    fn next(&mut self) -> Option<Self::Item> {
256        self.rx.as_ref().unwrap().recv().ok()
257    }
258}
259
260impl Drop for Sampler {
261    fn drop(&mut self) {
262        self.rx = None;
263        if let Some(t) = self.sampling_thread.take() {
264            t.join().unwrap();
265        }
266    }
267}
268
269struct PythonSpyThread {
270    initialized_rx: Receiver<Result<Version, Error>>,
271    notify_tx: Sender<()>,
272    sample_rx: Receiver<Result<Vec<StackTrace>, Error>>,
273    initialized: Option<Result<Version, Error>>,
274    pub running: bool,
275    notified: bool,
276    pub process: remoteprocess::Process,
277    pub parent: Option<Pid>,
278    pub command_line: String,
279}
280
281impl PythonSpyThread {
282    fn new(pid: Pid, parent: Option<Pid>, config: &Config) -> Result<PythonSpyThread, Error> {
283        let (initialized_tx, initialized_rx): (
284            Sender<Result<Version, Error>>,
285            Receiver<Result<Version, Error>>,
286        ) = mpsc::channel();
287        let (notify_tx, notify_rx): (Sender<()>, Receiver<()>) = mpsc::channel();
288        let (sample_tx, sample_rx): (
289            Sender<Result<Vec<StackTrace>, Error>>,
290            Receiver<Result<Vec<StackTrace>, Error>>,
291        ) = mpsc::channel();
292        let config = config.clone();
293        let process = remoteprocess::Process::new(pid)?;
294        let command_line = process
295            .cmdline()
296            .map(|x| x.join(" "))
297            .unwrap_or_else(|_| "".to_owned());
298
299        thread::spawn(move || {
300            // We need to create this object inside the thread here since PythonSpy objects don't
301            // have the Send trait implemented on linux
302            let mut spy = match PythonSpy::retry_new(pid, &config, 5) {
303                Ok(spy) => {
304                    if initialized_tx.send(Ok(spy.version.clone())).is_err() {
305                        return;
306                    }
307                    spy
308                }
309                Err(e) => {
310                    warn!("Failed to profile python from process {}: {}", pid, e);
311                    initialized_tx.send(Err(e)).unwrap();
312                    return;
313                }
314            };
315
316            for _ in notify_rx.iter() {
317                let result = spy.get_stack_traces();
318                if result.is_err() && spy.process.exe().is_err() {
319                    info!(
320                        "stopped sampling pid {} because the process exited",
321                        spy.pid
322                    );
323                    break;
324                }
325                if sample_tx.send(result).is_err() {
326                    break;
327                }
328            }
329        });
330        Ok(PythonSpyThread {
331            initialized_rx,
332            notify_tx,
333            sample_rx,
334            process,
335            command_line,
336            parent,
337            initialized: None,
338            running: false,
339            notified: false,
340        })
341    }
342
343    fn wait_initialized(&mut self) -> bool {
344        match self.initialized_rx.recv() {
345            Ok(status) => {
346                self.running = status.is_ok();
347                self.initialized = Some(status);
348                self.running
349            }
350            Err(e) => {
351                // shouldn't happen, but will be ok if it does
352                warn!(
353                    "Failed to get initialization status from PythonSpyThread: {}",
354                    e
355                );
356                false
357            }
358        }
359    }
360
361    fn initialized(&mut self) -> bool {
362        if let Some(init) = self.initialized.as_ref() {
363            return init.is_ok();
364        }
365        match self.initialized_rx.try_recv() {
366            Ok(status) => {
367                self.running = status.is_ok();
368                self.initialized = Some(status);
369                self.running
370            }
371            Err(std::sync::mpsc::TryRecvError::Empty) => false,
372            Err(std::sync::mpsc::TryRecvError::Disconnected) => {
373                // this *shouldn't* happen
374                warn!("Failed to get initialization status from PythonSpyThread: disconnected");
375                false
376            }
377        }
378    }
379
380    fn notify(&mut self) {
381        match self.notify_tx.send(()) {
382            Ok(_) => {
383                self.notified = true;
384            }
385            Err(_) => {
386                self.running = false;
387            }
388        }
389    }
390
391    fn collect(&mut self) -> Option<Result<Vec<StackTrace>, Error>> {
392        if !self.notified {
393            return None;
394        }
395        self.notified = false;
396        match self.sample_rx.recv() {
397            Ok(sample) => Some(sample),
398            Err(_) => {
399                self.running = false;
400                None
401            }
402        }
403    }
404}
405
406fn get_process_info(pid: Pid, spies: &HashMap<Pid, PythonSpyThread>) -> Option<Box<ProcessInfo>> {
407    spies.get(&pid).map(|spy| {
408        let parent = spy
409            .parent
410            .and_then(|parentpid| get_process_info(parentpid, spies));
411        Box::new(ProcessInfo {
412            pid,
413            parent,
414            command_line: spy.command_line.clone(),
415        })
416    })
417}