1use std::collections::HashMap;
2use std::sync::mpsc::{self, Sender, Receiver};
3use std::sync::{Mutex, Arc};
4use std::time::Duration;
5use std::thread;
6
7use anyhow::Error;
8
9use remoteprocess::Pid;
10
11use crate::timer::Timer;
12use crate::python_spy::PythonSpy;
13use crate::config::Config;
14use crate::stack_trace::{StackTrace, ProcessInfo};
15use crate::version::Version;
16
17pub struct Sampler {
18 pub version: Option<Version>,
19 rx: Option<Receiver<Sample>>,
20 sampling_thread: Option<thread::JoinHandle<()>>,
21}
22
23pub struct Sample {
24 pub traces: Vec<StackTrace>,
25 pub sampling_errors: Option<Vec<(Pid, Error)>>,
26 pub late: Option<Duration>
27}
28
29impl Sampler {
30 pub fn new(pid: Pid, config: &Config) -> Result<Sampler, Error> {
31 if config.subprocesses {
32 Self::new_subprocess_sampler(pid, config)
33 } else {
34 Self::new_sampler(pid, config)
35 }
36 }
37
38 fn new_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
40 let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
41 let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
42 let config = config.clone();
43 let sampling_thread = thread::spawn(move || {
44 let mut spy = match PythonSpy::retry_new(pid, &config, 20) {
47 Ok(spy) => {
48 if let Err(_) = initialized_tx.send(Ok(spy.version.clone())) {
49 return;
50 }
51 spy
52 },
53 Err(e) => {
54 if initialized_tx.send(Err(e)).is_err() {}
55 return;
56 }
57 };
58
59 for sleep in Timer::new(spy.config.sampling_rate as f64) {
60 let mut sampling_errors = None;
61 let traces = match spy.get_stack_traces() {
62 Ok(traces) => traces,
63 Err(e) => {
64 if spy.process.exe().is_err() {
65 info!("stopped sampling pid {} because the process exited", spy.pid);
66 break;
67 }
68 sampling_errors = Some(vec![(spy.pid, e)]);
69 Vec::new()
70 }
71 };
72
73 let late = sleep.err();
74 if tx.send(Sample{traces: traces, sampling_errors, late}).is_err() {
75 break;
76 }
77 }
78 });
79
80 let version = initialized_rx.recv()??;
81 Ok(Sampler{rx: Some(rx), version: Some(version), sampling_thread: Some(sampling_thread)})
82 }
83
84 fn new_subprocess_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
87 let process = remoteprocess::Process::new(pid)?;
88
89 let mut spies = HashMap::new();
91 let mut retries = 10;
92 spies.insert(pid, PythonSpyThread::new(pid, None, &config)?);
93
94 loop {
95 for (childpid, parentpid) in process.child_processes()? {
96 match PythonSpyThread::new(childpid, Some(parentpid), &config) {
99 Ok(spy) => { spies.insert(childpid, spy); },
100 Err(e) => { warn!("Failed to open process {}: {}", childpid, e); }
101 }
102 }
103
104 if spies.values_mut().any(|spy| spy.wait_initialized()) {
107 break;
108 }
109
110 retries -= 1;
112 if retries == 0 {
113 return Err(format_err!("No python processes found in process {} or any of its subprocesses", pid));
114 }
115 std::thread::sleep(std::time::Duration::from_millis(100));
116 }
117
118 let spies = Arc::new(Mutex::new(spies));
121 let monitor_spies = spies.clone();
122 let monitor_config = config.clone();
123 std::thread::spawn(move || {
124 while process.exe().is_ok() {
125 match monitor_spies.lock() {
126 Ok(mut spies) => {
127 for (childpid, parentpid) in process.child_processes().expect("failed to get subprocesses") {
128 if spies.contains_key(&childpid) {
129 continue;
130 }
131 match PythonSpyThread::new(childpid, Some(parentpid), &monitor_config) {
132 Ok(spy) => { spies.insert(childpid, spy); }
133 Err(e) => { warn!("Failed to create spy for {}: {}", childpid, e); }
134 }
135 }
136 },
137 Err(e) => { error!("Failed to acquire lock: {}", e); }
138 }
139 std::thread::sleep(Duration::from_millis(100));
140 }
141 });
142
143 let mut process_info = HashMap::new();
144
145 let config = config.clone();
147 let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
148 let sampling_thread = std::thread::spawn(move || {
149 for sleep in Timer::new(config.sampling_rate as f64) {
150 let mut traces = Vec::new();
151 let mut sampling_errors = None;
152
153 let mut spies = match spies.lock() {
154 Ok(current) => current,
155 Err(e) => {
156 error!("Failed to get process tree: {}", e);
157 continue;
158 }
159 };
160
161 for spy in spies.values_mut() {
163 if spy.initialized() {
164 spy.notify();
165 }
166 }
167
168 for spy in spies.values_mut() {
170 match spy.collect() {
171 Some(Ok(mut t)) => { traces.append(&mut t) },
172 Some(Err(e)) => {
173 let errors = sampling_errors.get_or_insert_with(|| Vec::new());
174 errors.push((spy.process.pid, e));
175 },
176 None => {}
177 }
178 }
179
180 for trace in traces.iter_mut() {
182 let pid = trace.pid;
183 let process = process_info.entry(pid).or_insert_with(|| {
185 get_process_info(pid, &spies).map(|p| Arc::new(*p))
186 });
187 trace.process_info = process.clone();
188 }
189
190 let late = sleep.err();
192 if tx.send(Sample{traces, sampling_errors, late}).is_err() {
193 break;
194 }
195
196 if spies.len() == 0 || spies.values().all(|x| !x.running) {
198 break;
199 }
200 }
201 });
202
203 Ok(Sampler{rx: Some(rx), version: None, sampling_thread: Some(sampling_thread)})
204 }
205}
206
207impl Iterator for Sampler {
208 type Item = Sample;
209 fn next(&mut self) -> Option<Self::Item> {
210 self.rx.as_ref().unwrap().recv().ok()
211 }
212}
213
214impl Drop for Sampler {
215 fn drop(&mut self) {
216 self.rx = None;
217 if let Some(t) = self.sampling_thread.take() {
218 t.join().unwrap();
219 }
220 }
221}
222
223struct PythonSpyThread {
224 initialized_rx: Receiver<Result<Version, Error>>,
225 notify_tx: Sender<()>,
226 sample_rx: Receiver<Result<Vec<StackTrace>, Error>>,
227 initialized: Option<Result<Version, Error>>,
228 pub running: bool,
229 notified: bool,
230 pub process: remoteprocess::Process,
231 pub parent: Option<Pid>,
232 pub command_line: String
233}
234
235impl PythonSpyThread {
236 fn new(pid: Pid, parent: Option<Pid>, config: &Config) -> Result<PythonSpyThread, Error> {
237 let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
238 let (notify_tx, notify_rx): (Sender<()>, Receiver<()>) = mpsc::channel();
239 let (sample_tx, sample_rx): (Sender<Result<Vec<StackTrace>, Error>>, Receiver<Result<Vec<StackTrace>, Error>>) = mpsc::channel();
240 let config = config.clone();
241 let process = remoteprocess::Process::new(pid)?;
242 let command_line = process.cmdline().map(|x| x.join(" ")).unwrap_or("".to_owned());
243
244 thread::spawn(move || {
245 let mut spy = match PythonSpy::retry_new(pid, &config, 5) {
248 Ok(spy) => {
249 if let Err(_) = initialized_tx.send(Ok(spy.version.clone())) {
250 return;
251 }
252 spy
253 },
254 Err(e) => {
255 warn!("Failed to profile python from process {}: {}", pid, e);
256 if initialized_tx.send(Err(e)).is_err() {}
257 return;
258 }
259 };
260
261 for _ in notify_rx.iter() {
262 let result = spy.get_stack_traces();
263 if let Err(_) = result {
264 if spy.process.exe().is_err() {
265 info!("stopped sampling pid {} because the process exited", spy.pid);
266 break;
267 }
268 }
269 if sample_tx.send(result).is_err() {
270 break;
271 }
272 }
273 });
274 Ok(PythonSpyThread{initialized_rx, notify_tx, sample_rx, process, command_line, parent, initialized: None, running: false, notified: false})
275 }
276
277 fn wait_initialized(&mut self) -> bool {
278 match self.initialized_rx.recv() {
279 Ok(status) => {
280 self.running = status.is_ok();
281 self.initialized = Some(status);
282 self.running
283 },
284 Err(e) => {
285 warn!("Failed to get initialization status from PythonSpyThread: {}", e);
287 false
288 }
289 }
290 }
291
292 fn initialized(&mut self) -> bool {
293 if let Some(init) = self.initialized.as_ref() {
294 return init.is_ok();
295 }
296 match self.initialized_rx.try_recv() {
297 Ok(status) => {
298 self.running = status.is_ok();
299 self.initialized = Some(status);
300 self.running
301 },
302 Err(std::sync::mpsc::TryRecvError::Empty) => false,
303 Err(std::sync::mpsc::TryRecvError::Disconnected) => {
304 warn!("Failed to get initialization status from PythonSpyThread: disconnected");
306 false
307 }
308 }
309 }
310
311 fn notify(&mut self) {
312 match self.notify_tx.send(()) {
313 Ok(_) => { self.notified = true; },
314 Err(_) => { self.running = false; }
315 }
316 }
317
318 fn collect(&mut self) -> Option<Result<Vec<StackTrace>, Error>> {
319 if !self.notified {
320 return None;
321 }
322 self.notified = false;
323 match self.sample_rx.recv() {
324 Ok(sample) => Some(sample),
325 Err(_) => {
326 self.running = false;
327 None
328 }
329 }
330 }
331}
332
333fn get_process_info(pid: Pid, spies: &HashMap<Pid, PythonSpyThread>) -> Option<Box<ProcessInfo>> {
334 spies.get(&pid).map(|spy| {
335 let parent = spy.parent.and_then(|parentpid| get_process_info(parentpid, spies));
336 Box::new(ProcessInfo{pid, parent, command_line: spy.command_line.clone()})
337 })
338}