use std;
use std::collections::HashMap;
#[cfg(all(target_os="linux", unwind))]
use std::collections::HashSet;
use std::mem::size_of;
use std::slice;
use std::path::Path;
#[cfg(all(target_os="linux", unwind))]
use std::iter::FromIterator;
use regex::Regex;
use failure::{Error, ResultExt};
use remoteprocess::{Process, ProcessMemory, Pid, Tid};
use proc_maps::{get_process_maps, MapRange};
use crate::binary_parser::{parse_binary, BinaryInfo};
use crate::config::Config;
#[cfg(unwind)]
use crate::native_stack_trace::NativeStack;
use crate::python_bindings::{pyruntime, v2_7_15, v3_3_7, v3_5_5, v3_6_6, v3_7_0, v3_8_0};
use crate::python_interpreters::{self, InterpreterState, ThreadState};
use crate::stack_trace::{StackTrace, get_stack_traces, get_stack_trace};
use crate::version::Version;
pub struct PythonSpy {
pub pid: Pid,
pub process: Process,
pub version: Version,
pub interpreter_address: usize,
pub threadstate_address: usize,
pub python_filename: String,
pub version_string: String,
pub config: Config,
#[cfg(unwind)]
pub native: Option<NativeStack>,
pub short_filenames: HashMap<String, Option<String>>,
pub python_thread_ids: HashMap<u64, Tid>,
#[cfg(target_os="linux")]
pub dockerized: bool
}
impl PythonSpy {
pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, Error> {
let process = remoteprocess::Process::new(pid)
.context("Failed to open process - check if it is running.")?;
let python_info = PythonProcessInfo::new(&process)?;
#[cfg(target_os="freebsd")]
let _lock = process.lock();
let version = get_python_version(&python_info, &process)?;
info!("python version {} detected", version);
let interpreter_address = get_interpreter_address(&python_info, &process, &version)?;
info!("Found interpreter at 0x{:016x}", interpreter_address);
let threadstate_address = match version {
Version{major: 3, minor: 7..=9, ..} => {
match python_info.get_symbol("_PyRuntime") {
Some(&addr) => {
if let Some(offset) = pyruntime::get_tstate_current_offset(&version) {
info!("Found _PyRuntime @ 0x{:016x}, getting gilstate.tstate_current from offset 0x{:x}",
addr, offset);
addr as usize + offset
} else {
warn!("Unknown pyruntime.gilstate.tstate_current offset for version {:?}", version);
0
}
},
None => {
warn!("Failed to find _PyRuntime symbol - won't be able to detect GIL usage");
0
}
}
},
_ => {
match python_info.get_symbol("_PyThreadState_Current") {
Some(&addr) => {
info!("Found _PyThreadState_Current @ 0x{:016x}", addr);
addr as usize
},
None => {
warn!("Failed to find _PyThreadState_Current symbol - won't be able to detect GIL usage");
0
}
}
}
};
let version_string = format!("python{}.{}", version.major, version.minor);
#[cfg(all(unwind, not(target_os="linux")))]
let native = if config.native {
Some(NativeStack::new(pid, python_info.python_binary, python_info.libpython_binary)?)
} else {
None
};
#[cfg(all(unwind, target_os="linux"))]
let native = Some(NativeStack::new(pid, python_info.python_binary, python_info.libpython_binary)?);
Ok(PythonSpy{pid, process, version, interpreter_address, threadstate_address,
python_filename: python_info.python_filename,
version_string,
#[cfg(unwind)]
native,
#[cfg(target_os="linux")]
dockerized: python_info.dockerized,
config: config.clone(),
short_filenames: HashMap::new(),
python_thread_ids: HashMap::new()})
}
pub fn retry_new(pid: Pid, config: &Config, max_retries:u64) -> Result<PythonSpy, Error> {
let mut retries = 0;
loop {
let err = match PythonSpy::new(pid, config) {
Ok(mut process) => {
match process.get_stack_traces() {
Ok(_) => return Ok(process),
Err(err) => err
}
},
Err(err) => err
};
retries += 1;
if retries >= max_retries {
return Err(err);
}
info!("Failed to connect to process, retrying. Error: {}", err);
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, Error> {
match self.version {
Version{major: 2, minor: 3..=7, ..} => self._get_stack_traces::<v2_7_15::_is>(),
Version{major: 3, minor: 3, ..} => self._get_stack_traces::<v3_3_7::_is>(),
Version{major: 3, minor: 4, ..} => self._get_stack_traces::<v3_5_5::_is>(),
Version{major: 3, minor: 5, ..} => self._get_stack_traces::<v3_5_5::_is>(),
Version{major: 3, minor: 6, ..} => self._get_stack_traces::<v3_6_6::_is>(),
Version{major: 3, minor: 7, ..} => self._get_stack_traces::<v3_7_0::_is>(),
Version{major: 3, minor: 8, patch: 0, ..} => {
match self.version.release_flags.as_ref() {
"a1" | "a2" | "a3" => self._get_stack_traces::<v3_7_0::_is>(),
_ => self._get_stack_traces::<v3_8_0::_is>()
}
}
Version{major: 3, minor: 8..=9, ..} => self._get_stack_traces::<v3_8_0::_is>(),
_ => Err(format_err!("Unsupported version of Python: {}", self.version)),
}
}
fn _get_stack_traces<I: InterpreterState>(&mut self) -> Result<Vec<StackTrace>, Error> {
let mut thread_activity = HashMap::new();
for thread in self.process.threads()?.iter() {
let threadid: Tid = thread.id()?;
thread_activity.insert(threadid, thread.active()?);
}
let _lock = if self.config.non_blocking {
None
} else {
Some(self.process.lock().context("Failed to suspend process")?)
};
let gil_thread_id = self._get_gil_threadid::<I>()?;
let interp: I = self.process.copy_struct(self.interpreter_address)
.context("Failed to copy PyInterpreterState from process")?;
let mut traces = Vec::new();
let mut threads = interp.head();
while !threads.is_null() {
let thread = self.process.copy_pointer(threads).context("Failed to copy PyThreadState")?;
let mut trace = get_stack_trace(&thread, &self.process)?;
let python_thread_id = thread.thread_id();
let mut os_thread_id = self._get_os_thread_id(python_thread_id, &interp)?;
if let Some(tid) = os_thread_id {
if thread_activity.len() > 0 && !thread_activity.contains_key(&tid) {
info!("clearing away thread_id cache, thread {} has exitted", tid);
self.python_thread_ids.clear();
os_thread_id = self._get_os_thread_id(python_thread_id, &interp)?;
}
}
trace.os_thread_id = os_thread_id.map(|id| id as u64);
trace.owns_gil = trace.thread_id == gil_thread_id;
trace.active = true;
if let Some(id) = os_thread_id {
if let Some(active) = thread_activity.get(&id) {
trace.active = *active;
}
}
if trace.active {
trace.active = !self._heuristic_is_thread_idle(&trace);
}
#[cfg(unwind)]
{
if self.config.native {
if let Some(native) = self.native.as_mut() {
let os_thread = remoteprocess::Thread::new(os_thread_id.unwrap())?;
trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
}
}
}
for frame in &mut trace.frames {
frame.short_filename = self.shorten_filename(&frame.filename);
}
traces.push(trace);
if traces.len() > 4096 {
return Err(format_err!("Max thread recursion depth reached"));
}
threads = thread.next();
}
Ok(traces)
}
fn _heuristic_is_thread_idle(&self, trace: &StackTrace) -> bool {
let frames = &trace.frames;
if frames.is_empty() {
false
} else {
let frame = &frames[0];
(frame.name == "wait" && frame.filename.ends_with("threading.py")) ||
(frame.name == "select" && frame.filename.ends_with("selectors.py")) ||
(frame.name == "poll" && (frame.filename.ends_with("asyncore.py") ||
frame.filename.contains("zmq") ||
frame.filename.contains("gevent") ||
frame.filename.contains("tornado")))
}
}
#[cfg(windows)]
fn _get_os_thread_id<I: InterpreterState>(&mut self, python_thread_id: u64, _interp: &I) -> Result<Option<Tid>, Error> {
Ok(Some(python_thread_id as Tid))
}
#[cfg(target_os="macos")]
fn _get_os_thread_id<I: InterpreterState>(&mut self, python_thread_id: u64, _interp: &I) -> Result<Option<Tid>, Error> {
if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
return Ok(Some(*thread_id));
}
for thread in self.process.threads()?.iter() {
let current_handle = thread.thread_handle()? - 224;
self.python_thread_ids.insert(current_handle, thread.id()?);
}
if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
return Ok(Some(*thread_id));
}
Ok(None)
}
#[cfg(all(target_os="linux", not(unwind)))]
fn _get_os_thread_id<I: InterpreterState>(&mut self, _python_thread_id: u64, _interp: &I) -> Result<Option<Tid>, Error> {
Ok(None)
}
#[cfg(all(target_os="linux", unwind))]
fn _get_os_thread_id<I: InterpreterState>(&mut self, python_thread_id: u64, interp: &I) -> Result<Option<Tid>, Error> {
if self.config.non_blocking {
return Ok(None);
}
if self.dockerized {
return Ok(None);
}
if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
return Ok(Some(*thread_id));
}
let mut all_python_threads = HashSet::new();
let mut threads = interp.head();
while !threads.is_null() {
let thread = self.process.copy_pointer(threads).context("Failed to copy PyThreadState")?;
let current = thread.thread_id();
all_python_threads.insert(current);
threads = thread.next();
}
let processed_os_threads: HashSet<Tid> = HashSet::from_iter(self.python_thread_ids.values().map(|x| *x));
let native = self.native.as_ref().unwrap();
for thread in self.process.threads()?.iter() {
let threadid = thread.id()?;
if processed_os_threads.contains(&threadid) {
continue;
}
match native.get_pthread_id(&thread, &all_python_threads) {
Ok(pthread_id) => {
if pthread_id != 0 {
self.python_thread_ids.insert(pthread_id, threadid);
}
},
Err(e) => { warn!("Failed to get get_pthread_id for {}: {}", threadid, e); }
};
}
if !processed_os_threads.contains(&self.pid) {
let mut unknown_python_threadids = HashSet::new();
for python_thread_id in all_python_threads.iter() {
if !self.python_thread_ids.contains_key(python_thread_id) {
unknown_python_threadids.insert(*python_thread_id);
}
}
if unknown_python_threadids.len() == 1 {
let python_thread_id = *unknown_python_threadids.iter().next().unwrap();
self.python_thread_ids.insert(python_thread_id, self.pid);
} else {
warn!("failed to get python threadid for main thread!");
}
}
if let Some(thread_id) = self.python_thread_ids.get(&python_thread_id) {
return Ok(Some(*thread_id));
}
info!("failed looking up python threadid for {}. known python_thread_ids {:?}. all_python_threads {:?}",
python_thread_id, self.python_thread_ids, all_python_threads);
Ok(None)
}
#[cfg(target_os="freebsd")]
fn _get_os_thread_id<I: InterpreterState>(&mut self, _python_thread_id: u64, _interp: &I) -> Result<Option<Tid>, Error> {
Ok(None)
}
fn _get_gil_threadid<I: InterpreterState>(&self) -> Result<u64, Error> {
if self.threadstate_address > 0 {
let addr: usize = self.process.copy_struct(self.threadstate_address)?;
if addr != 0 {
let threadstate: I::ThreadState = self.process.copy_struct(addr)?;
return Ok(threadstate.thread_id());
}
}
Ok(0)
}
fn shorten_filename(&mut self, filename: &str) -> Option<String> {
if let Some(short) = self.short_filenames.get(filename) {
return short.clone();
}
let mut path = Path::new(filename);
while let Some(parent) = path.parent() {
path = parent;
if !parent.join("__init__.py").exists() {
break;
}
}
let shortened = Path::new(filename)
.strip_prefix(path)
.ok()
.map(|p| p.to_string_lossy().to_string());
self.short_filenames.insert(filename.to_owned(), shortened.clone());
shortened
}
}
fn get_python_version(python_info: &PythonProcessInfo, process: &remoteprocess::Process)
-> Result<Version, Error> {
if let Some(&addr) = python_info.get_symbol("Py_GetVersion.version") {
info!("Getting version from symbol address");
if let Ok(bytes) = process.copy(addr as usize, 128) {
if let Ok(version) = Version::scan_bytes(&bytes) {
return Ok(version);
}
}
}
info!("Getting version from python binary BSS");
let bss = process.copy(python_info.python_binary.bss_addr as usize,
python_info.python_binary.bss_size as usize)?;
match Version::scan_bytes(&bss) {
Ok(version) => return Ok(version),
Err(err) => {
info!("Failed to get version from BSS section: {}", err);
if let Some(ref libpython) = python_info.libpython_binary {
info!("Getting version from libpython BSS");
let bss = process.copy(libpython.bss_addr as usize,
libpython.bss_size as usize)?;
match Version::scan_bytes(&bss) {
Ok(version) => return Ok(version),
Err(err) => info!("Failed to get version from libpython BSS section: {}", err)
}
}
}
}
info!("Trying to get version from path: {}", python_info.python_filename);
let path = Path::new(&python_info.python_filename);
if let Some(python) = path.file_name() {
if let Some(python) = python.to_str() {
if python.starts_with("python") {
let tokens: Vec<&str> = python[6..].split('.').collect();
if tokens.len() >= 2 {
if let (Ok(major), Ok(minor)) = (tokens[0].parse::<u64>(), tokens[1].parse::<u64>()) {
return Ok(Version{major, minor, patch:0, release_flags: "".to_owned()})
}
}
}
}
}
Err(format_err!("Failed to find python version from target process"))
}
fn get_interpreter_address(python_info: &PythonProcessInfo,
process: &remoteprocess::Process,
version: &Version) -> Result<usize, Error> {
match version {
Version{major: 3, minor: 7..=9, ..} => {
if let Some(&addr) = python_info.get_symbol("_PyRuntime") {
let addr = process.copy_struct(addr as usize + pyruntime::get_interp_head_offset(&version))?;
match check_interpreter_addresses(&[addr], &python_info.maps, process, version) {
Ok(addr) => return Ok(addr),
Err(_) => { warn!("Interpreter address from _PyRuntime symbol is invalid {:016x}", addr); }
};
}
},
_ => {
if let Some(&addr) = python_info.get_symbol("interp_head") {
let addr = process.copy_struct(addr as usize)?;
match check_interpreter_addresses(&[addr], &python_info.maps, process, version) {
Ok(addr) => return Ok(addr),
Err(_) => { warn!("Interpreter address from interp_head symbol is invalid {:016x}", addr); }
};
}
}
};
info!("Failed to get interp_head from symbols, scanning BSS section from main binary");
match get_interpreter_address_from_binary(&python_info.python_binary, &python_info.maps, process, version) {
Ok(addr) => Ok(addr),
Err(err) => {
match python_info.libpython_binary {
Some(ref libpython) => {
info!("Failed to get interpreter from binary BSS, scanning libpython BSS");
Ok(get_interpreter_address_from_binary(libpython, &python_info.maps, process, version)?)
},
None => Err(err)
}
}
}
}
fn get_interpreter_address_from_binary(binary: &BinaryInfo,
maps: &[MapRange],
process: &remoteprocess::Process,
version: &Version) -> Result<usize, Error> {
let bss = process.copy(binary.bss_addr as usize, binary.bss_size as usize)?;
#[allow(clippy::cast_ptr_alignment)]
let addrs = unsafe { slice::from_raw_parts(bss.as_ptr() as *const usize, bss.len() / size_of::<usize>()) };
check_interpreter_addresses(addrs, maps, process, version)
}
fn check_interpreter_addresses(addrs: &[usize],
maps: &[MapRange],
process: &remoteprocess::Process,
version: &Version) -> Result<usize, Error> {
#[cfg(windows)]
fn maps_contain_addr(_: usize, _: &[MapRange]) -> bool { true }
#[cfg(not(windows))]
use proc_maps::maps_contain_addr;
fn check<I>(addrs: &[usize],
maps: &[MapRange],
process: &remoteprocess::Process) -> Result<usize, Error>
where I: python_interpreters::InterpreterState {
for &addr in addrs {
if maps_contain_addr(addr, maps) {
let interp: I = match process.copy_struct(addr) {
Ok(interp) => interp,
Err(_) => continue
};
let threads = interp.head();
if maps_contain_addr(threads as usize, maps) {
let thread = match process.copy_pointer(threads) {
Ok(thread) => thread,
Err(_) => continue
};
if thread.interp() as usize == addr && get_stack_traces(&interp, process).is_ok() {
return Ok(addr);
}
}
}
}
Err(format_err!("Failed to find a python interpreter in the .data section"))
}
match version {
Version{major: 2, minor: 3..=7, ..} => check::<v2_7_15::_is>(addrs, maps, process),
Version{major: 3, minor: 3, ..} => check::<v3_3_7::_is>(addrs, maps, process),
Version{major: 3, minor: 4..=5, ..} => check::<v3_5_5::_is>(addrs, maps, process),
Version{major: 3, minor: 6, ..} => check::<v3_6_6::_is>(addrs, maps, process),
Version{major: 3, minor: 7, ..} => check::<v3_7_0::_is>(addrs, maps, process),
Version{major: 3, minor: 8, patch: 0, ..} => {
match version.release_flags.as_ref() {
"a1" | "a2" | "a3" => check::<v3_7_0::_is>(addrs, maps, process),
_ => check::<v3_8_0::_is>(addrs, maps, process)
}
},
Version{major: 3, minor: 8..=9, ..} => check::<v3_8_0::_is>(addrs, maps, process),
_ => Err(format_err!("Unsupported version of Python: {}", version))
}
}
pub struct PythonProcessInfo {
python_binary: BinaryInfo,
libpython_binary: Option<BinaryInfo>,
maps: Vec<MapRange>,
python_filename: String,
#[cfg(target_os="linux")]
dockerized: bool,
}
impl PythonProcessInfo {
fn new(process: &remoteprocess::Process) -> Result<PythonProcessInfo, Error> {
let filename = process.exe()
.context("Failed to get process executable name. Check that the process is running.")?;
#[cfg(windows)]
let filename = filename.to_lowercase();
#[cfg(windows)]
let is_python_bin = |pathname: &str| pathname.to_lowercase() == filename;
#[cfg(not(windows))]
let is_python_bin = |pathname: &str| pathname == filename;
let maps = get_process_maps(process.pid)?;
info!("Got virtual memory maps from pid {}:", process.pid);
for map in &maps {
debug!("map: {:016x}-{:016x} {}{}{} {}", map.start(), map.start() + map.size(),
if map.is_read() {'r'} else {'-'}, if map.is_write() {'w'} else {'-'}, if map.is_exec() {'x'} else {'-'},
map.filename().as_ref().unwrap_or(&"".to_owned()));
}
#[cfg(target_os="linux")]
let namespace = match remoteprocess::Namespace::new(process.pid) {
Ok(ns) => Some(ns),
Err(e) => {
warn!("Failed to set namespace: {}", e);
None
}
};
let (python_binary, python_filename) = {
let map = maps.iter()
.find(|m| if let Some(pathname) = &m.filename() {
is_python_bin(pathname) && m.is_exec()
} else {
false
});
let map = match map {
Some(map) => map,
None => {
warn!("Failed to find '{}' in virtual memory maps, falling back to first map region", filename);
&maps[0]
}
};
#[allow(unused_mut)]
let mut python_binary = parse_binary(&filename, map.start() as u64, map.size() as u64)?;
#[cfg(windows)]
python_binary.symbols.extend(get_windows_python_symbols(process.pid, &filename, map.start() as u64)?);
#[cfg(target_os = "macos")]
{
let offset = python_binary.symbols["_mh_execute_header"] - map.start() as u64;
for address in python_binary.symbols.values_mut() {
*address -= offset;
}
if python_binary.bss_addr != 0 {
python_binary.bss_addr -= offset;
}
}
(python_binary, filename.clone())
};
let libpython_binary = {
let libmap = maps.iter()
.find(|m| if let Some(ref pathname) = &m.filename() {
is_python_lib(pathname) && m.is_exec()
} else {
false
});
let mut libpython_binary: Option<BinaryInfo> = None;
if let Some(libpython) = libmap {
if let Some(filename) = &libpython.filename() {
info!("Found libpython binary @ {}", filename);
#[allow(unused_mut)]
let mut parsed = parse_binary(filename, libpython.start() as u64, libpython.size() as u64)?;
#[cfg(windows)]
parsed.symbols.extend(get_windows_python_symbols(process.pid, filename, libpython.start() as u64)?);
libpython_binary = Some(parsed);
}
}
#[cfg(target_os = "macos")]
{
if libpython_binary.is_none() {
use proc_maps::mac_maps::get_dyld_info;
let dyld_infos = get_dyld_info(process.pid)?;
for dyld in &dyld_infos {
let segname = unsafe { std::ffi::CStr::from_ptr(dyld.segment.segname.as_ptr()) };
debug!("dyld: {:016x}-{:016x} {:10} {}",
dyld.segment.vmaddr, dyld.segment.vmaddr + dyld.segment.vmsize,
segname.to_string_lossy(), dyld.filename);
}
let python_dyld_data = dyld_infos.iter()
.find(|m| is_python_framework(&m.filename) &&
m.segment.segname[0..7] == [95, 95, 68, 65, 84, 65, 0]);
if let Some(libpython) = python_dyld_data {
info!("Found libpython binary from dyld @ {}", libpython.filename);
let mut binary = parse_binary(&libpython.filename, libpython.segment.vmaddr, libpython.segment.vmsize)?;
binary.bss_addr = libpython.segment.vmaddr;
binary.bss_size = libpython.segment.vmsize;
libpython_binary = Some(binary);
}
}
}
libpython_binary
};
Ok(PythonProcessInfo{python_binary, libpython_binary, maps, python_filename,
#[cfg(target_os="linux")]
dockerized: match namespace { Some(ns) => ns.is_set(), None => false },
})
}
pub fn get_symbol(&self, symbol: &str) -> Option<&u64> {
if let Some(addr) = self.python_binary.symbols.get(symbol) {
info!("got symbol {} (0x{:016x}) from python binary", symbol, addr);
return Some(addr);
}
if let Some(ref binary) = self.libpython_binary {
if let Some(addr) = binary.symbols.get(symbol) {
info!("got symbol {} (0x{:016x}) from libpython binary", symbol, addr);
return Some(addr);
}
}
None
}
}
#[cfg(windows)]
pub fn get_windows_python_symbols(pid: Pid, filename: &str, offset: u64) -> std::io::Result<HashMap<String, u64>> {
use proc_maps::win_maps::SymbolLoader;
let handler = SymbolLoader::new(pid)?;
let _module = handler.load_module(filename)?;
let mut ret = HashMap::new();
for symbol in ["_PyThreadState_Current", "interp_head", "_PyRuntime"].iter() {
if let Ok((base, addr)) = handler.address_from_name(symbol) {
let addr = if base == 0 { addr } else { offset + addr - base };
ret.insert(String::from(*symbol), addr);
}
}
Ok(ret)
}
#[cfg(any(target_os="linux", target_os="freebsd"))]
pub fn is_python_lib(pathname: &str) -> bool {
lazy_static! {
static ref RE: Regex = Regex::new(r"/libpython\d.\d(m|d|u)?.so").unwrap();
}
RE.is_match(pathname)
}
#[cfg(target_os="macos")]
pub fn is_python_lib(pathname: &str) -> bool {
lazy_static! {
static ref RE: Regex = Regex::new(r"/libpython\d.\d(m|d|u)?.(dylib|so)$").unwrap();
}
RE.is_match(pathname) || is_python_framework(pathname)
}
#[cfg(windows)]
pub fn is_python_lib(pathname: &str) -> bool {
lazy_static! {
static ref RE: Regex = Regex::new(r"\\python\d\d(m|d|u)?.dll$").unwrap();
}
RE.is_match(pathname)
}
#[cfg(target_os="macos")]
pub fn is_python_framework(pathname: &str) -> bool {
pathname.ends_with("/Python") &&
pathname.contains("/Python.framework/") &&
!pathname.contains("Python.app")
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_os="macos")]
#[test]
fn test_is_python_lib() {
assert!(is_python_lib("~/Anaconda2/lib/libpython2.7.dylib"));
assert!(is_python_lib("/lib/libpython3.4d.dylib"));
assert!(is_python_lib("/usr/local/lib/libpython3.8m.dylib"));
assert!(is_python_lib("./libpython2.7u.dylib"));
assert!(!is_python_lib("/libboost_python.dylib"));
assert!(!is_python_lib("/lib/heapq.cpython-36m-darwin.dylib"));
}
#[cfg(any(target_os="linux", target_os="freebsd"))]
#[test]
fn test_is_python_lib() {
assert!(is_python_lib("/tmp/_MEIOqzg01/libpython2.7.so.1.0"));
assert!(is_python_lib("./libpython2.7.so"));
assert!(is_python_lib("/usr/lib/libpython3.4d.so"));
assert!(is_python_lib("/usr/local/lib/libpython3.8m.so"));
assert!(is_python_lib("/usr/lib/libpython2.7u.so"));
assert!(!is_python_lib("/usr/lib/libboost_python.so"));
assert!(!is_python_lib("/usr/lib/x86_64-linux-gnu/libboost_python-py27.so.1.58.0"));
assert!(!is_python_lib("/usr/lib/libboost_python-py35.so"));
}
#[cfg(target_os="macos")]
#[test]
fn test_python_frameworks() {
assert!(!is_python_framework("/usr/local/Cellar/python@2/2.7.15_1/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python"));
assert!(is_python_framework("/usr/local/Cellar/python@2/2.7.15_1/Frameworks/Python.framework/Versions/2.7/Python"));
assert!(!is_python_framework("/System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python"));
assert!(is_python_framework("/System/Library/Frameworks/Python.framework/Versions/2.7/Python"));
assert!(is_python_framework("/Users/ben/.pyenv/versions/3.6.6/Python.framework/Versions/3.6/Python"));
assert!(!is_python_framework("/Users/ben/.pyenv/versions/3.6.6/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python"));
}
}