py_spy_for_datakit/
sampler.rs

1use std::collections::HashMap;
2use std::sync::mpsc::{self, Sender, Receiver};
3use std::sync::{Mutex, Arc};
4use std::time::Duration;
5use std::thread;
6
7use anyhow::Error;
8
9use remoteprocess::Pid;
10
11use crate::timer::Timer;
12use crate::python_spy::PythonSpy;
13use crate::config::Config;
14use crate::stack_trace::{StackTrace, ProcessInfo};
15use crate::version::Version;
16
17pub struct Sampler {
18    pub version: Option<Version>,
19    rx: Option<Receiver<Sample>>,
20    sampling_thread: Option<thread::JoinHandle<()>>,
21}
22
23pub struct Sample {
24    pub traces: Vec<StackTrace>,
25    pub sampling_errors: Option<Vec<(Pid, Error)>>,
26    pub late: Option<Duration>
27}
28
29impl Sampler {
30    pub fn new(pid: Pid, config: &Config) -> Result<Sampler, Error> {
31        if config.subprocesses {
32            Self::new_subprocess_sampler(pid, config)
33        } else {
34            Self::new_sampler(pid, config)
35        }
36    }
37
38    /// Creates a new sampler object, reading from a single process only
39    fn new_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
40        let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
41        let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
42        let config = config.clone();
43        let sampling_thread = thread::spawn(move || {
44            // We need to create this object inside the thread here since PythonSpy objects don't
45            // have the Send trait implemented on linux
46            let mut spy = match PythonSpy::retry_new(pid, &config, 20) {
47                Ok(spy) => {
48                    if let Err(_) = initialized_tx.send(Ok(spy.version.clone())) {
49                        return;
50                    }
51                    spy
52                },
53                Err(e) =>  {
54                    if initialized_tx.send(Err(e)).is_err() {}
55                    return;
56                }
57            };
58
59            for sleep in Timer::new(spy.config.sampling_rate as f64) {
60                let mut sampling_errors = None;
61                let traces = match spy.get_stack_traces() {
62                    Ok(traces) => traces,
63                    Err(e) => {
64                        if spy.process.exe().is_err() {
65                            info!("stopped sampling pid {} because the process exited", spy.pid);
66                            break;
67                        }
68                        sampling_errors = Some(vec![(spy.pid, e)]);
69                        Vec::new()
70                    }
71                };
72
73                let late = sleep.err();
74                if tx.send(Sample{traces: traces, sampling_errors, late}).is_err() {
75                    break;
76                }
77            }
78        });
79
80        let version = initialized_rx.recv()??;
81        Ok(Sampler{rx: Some(rx), version: Some(version), sampling_thread: Some(sampling_thread)})
82    }
83
84    /// Creates a new sampler object that samples any python process in the
85    /// process or child processes
86    fn new_subprocess_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
87        let process = remoteprocess::Process::new(pid)?;
88
89        // Initialize a PythonSpy object per child, and build up the process tree
90        let mut spies = HashMap::new();
91        let mut retries = 10;
92        spies.insert(pid, PythonSpyThread::new(pid, None, &config)?);
93
94        loop {
95            for (childpid, parentpid) in process.child_processes()? {
96                // If we can't create the child process, don't worry about it
97                // can happen with zombie child processes etc
98                match PythonSpyThread::new(childpid, Some(parentpid), &config) {
99                    Ok(spy)  => { spies.insert(childpid, spy); },
100                    Err(e) => { warn!("Failed to open process {}: {}", childpid, e); }
101                }
102            }
103
104            // wait for all the various python spy objects to initialize, and break out of here
105            // if we have one of them started.
106            if spies.values_mut().any(|spy| spy.wait_initialized()) {
107                break;
108            }
109
110            // Otherwise sleep for a short time and retry
111            retries -= 1;
112            if retries == 0 {
113                return Err(format_err!("No python processes found in process {} or any of its subprocesses", pid));
114            }
115            std::thread::sleep(std::time::Duration::from_millis(100));
116        }
117
118        // Create a new thread to periodically monitor for new child processes, and update
119        // the procesess map
120        let spies = Arc::new(Mutex::new(spies));
121        let monitor_spies = spies.clone();
122        let monitor_config = config.clone();
123        std::thread::spawn(move || {
124            while process.exe().is_ok() {
125                match monitor_spies.lock() {
126                    Ok(mut spies) => {
127                        for (childpid, parentpid) in process.child_processes().expect("failed to get subprocesses") {
128                            if spies.contains_key(&childpid) {
129                                continue;
130                            }
131                            match PythonSpyThread::new(childpid, Some(parentpid), &monitor_config) {
132                                Ok(spy) => { spies.insert(childpid, spy); }
133                                Err(e) => { warn!("Failed to create spy for {}: {}", childpid, e);  }
134                            }
135                        }
136                    },
137                    Err(e) => { error!("Failed to acquire lock: {}", e); }
138                }
139                std::thread::sleep(Duration::from_millis(100));
140            }
141        });
142
143        let mut process_info = HashMap::new();
144
145        // Create a new thread to generate samples
146        let config = config.clone();
147        let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
148        let sampling_thread = std::thread::spawn(move || {
149            for sleep in Timer::new(config.sampling_rate as f64) {
150                let mut traces = Vec::new();
151                let mut sampling_errors = None;
152
153                let mut spies = match spies.lock() {
154                    Ok(current) => current,
155                    Err(e) => {
156                        error!("Failed to get process tree: {}", e);
157                        continue;
158                    }
159                };
160
161                // Notify all the initialized spies to generate a trace
162                for spy in spies.values_mut() {
163                    if spy.initialized() {
164                        spy.notify();
165                    }
166                }
167
168                // collect the traces from each python spy if possible
169                for spy in spies.values_mut() {
170                    match spy.collect() {
171                        Some(Ok(mut t)) => { traces.append(&mut t) },
172                        Some(Err(e)) => {
173                            let errors = sampling_errors.get_or_insert_with(|| Vec::new());
174                            errors.push((spy.process.pid, e));
175                        },
176                        None => {}
177                    }
178                }
179
180                // Annotate each trace with the process info
181                for trace in traces.iter_mut() {
182                    let pid = trace.pid;
183                    // Annotate each trace with the process info for the current
184                    let process = process_info.entry(pid).or_insert_with(|| {
185                        get_process_info(pid, &spies).map(|p| Arc::new(*p))
186                    });
187                    trace.process_info = process.clone();
188                }
189
190                // Send the collected info back
191                let late = sleep.err();
192                if tx.send(Sample{traces, sampling_errors, late}).is_err() {
193                    break;
194                }
195
196                // If all of our spies have stopped, we're done
197                if spies.len() == 0 || spies.values().all(|x| !x.running) {
198                    break;
199                }
200            }
201        });
202
203        Ok(Sampler{rx: Some(rx), version: None, sampling_thread: Some(sampling_thread)})
204    }
205}
206
207impl Iterator for Sampler {
208    type Item = Sample;
209    fn next(&mut self) -> Option<Self::Item> {
210        self.rx.as_ref().unwrap().recv().ok()
211    }
212}
213
214impl Drop for Sampler {
215    fn drop(&mut self) {
216        self.rx = None;
217        if let Some(t) = self.sampling_thread.take() {
218            t.join().unwrap();
219        }
220    }
221}
222
223struct PythonSpyThread {
224    initialized_rx: Receiver<Result<Version, Error>>,
225    notify_tx: Sender<()>,
226    sample_rx: Receiver<Result<Vec<StackTrace>, Error>>,
227    initialized: Option<Result<Version, Error>>,
228    pub running: bool,
229    notified: bool,
230    pub process: remoteprocess::Process,
231    pub parent: Option<Pid>,
232    pub command_line: String
233}
234
235impl PythonSpyThread {
236    fn new(pid: Pid, parent: Option<Pid>, config: &Config) -> Result<PythonSpyThread, Error> {
237        let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
238        let (notify_tx, notify_rx): (Sender<()>, Receiver<()>) = mpsc::channel();
239        let (sample_tx, sample_rx): (Sender<Result<Vec<StackTrace>, Error>>, Receiver<Result<Vec<StackTrace>, Error>>) = mpsc::channel();
240        let config = config.clone();
241        let process = remoteprocess::Process::new(pid)?;
242        let command_line = process.cmdline().map(|x| x.join(" ")).unwrap_or("".to_owned());
243
244        thread::spawn(move || {
245            // We need to create this object inside the thread here since PythonSpy objects don't
246            // have the Send trait implemented on linux
247            let mut spy = match PythonSpy::retry_new(pid, &config, 5) {
248                Ok(spy) => {
249                    if let Err(_) = initialized_tx.send(Ok(spy.version.clone())) {
250                        return;
251                    }
252                    spy
253                },
254                Err(e) =>  {
255                    warn!("Failed to profile python from process {}: {}", pid, e);
256                    if initialized_tx.send(Err(e)).is_err() {}
257                    return;
258                }
259            };
260
261            for _ in notify_rx.iter() {
262                let result = spy.get_stack_traces();
263                if let Err(_) = result {
264                    if spy.process.exe().is_err() {
265                        info!("stopped sampling pid {} because the process exited", spy.pid);
266                        break;
267                    }
268                }
269                if sample_tx.send(result).is_err() {
270                    break;
271                }
272            }
273        });
274        Ok(PythonSpyThread{initialized_rx, notify_tx, sample_rx, process, command_line, parent, initialized: None, running: false, notified: false})
275    }
276
277    fn wait_initialized(&mut self) -> bool  {
278        match self.initialized_rx.recv() {
279            Ok(status) => {
280                self.running = status.is_ok();
281                self.initialized = Some(status);
282                self.running
283            },
284            Err(e) => {
285                // shouldn't happen, but will be ok if it does
286                warn!("Failed to get initialization status from PythonSpyThread: {}", e);
287                false
288            }
289        }
290    }
291
292    fn initialized(&mut self) -> bool {
293        if let Some(init) = self.initialized.as_ref() {
294            return init.is_ok();
295        }
296        match self.initialized_rx.try_recv() {
297            Ok(status) => {
298                self.running = status.is_ok();
299                self.initialized = Some(status);
300                self.running
301            },
302            Err(std::sync::mpsc::TryRecvError::Empty) => false,
303            Err(std::sync::mpsc::TryRecvError::Disconnected) => {
304                // this *shouldn't* happen
305                warn!("Failed to get initialization status from PythonSpyThread: disconnected");
306                false
307            }
308        }
309    }
310
311    fn notify(&mut self) {
312        match self.notify_tx.send(()) {
313            Ok(_) => { self.notified = true; },
314            Err(_) => { self.running = false; }
315        }
316    }
317
318    fn collect(&mut self) -> Option<Result<Vec<StackTrace>, Error>>  {
319        if !self.notified {
320            return None;
321        }
322        self.notified = false;
323        match self.sample_rx.recv() {
324            Ok(sample) => Some(sample),
325            Err(_) => {
326                self.running = false;
327                None
328            }
329        }
330    }
331}
332
333fn get_process_info(pid: Pid, spies: &HashMap<Pid, PythonSpyThread>) -> Option<Box<ProcessInfo>> {
334    spies.get(&pid).map(|spy| {
335        let parent = spy.parent.and_then(|parentpid| get_process_info(parentpid, spies));
336        Box::new(ProcessInfo{pid, parent, command_line: spy.command_line.clone()})
337    })
338}