use cpython::*;
#[allow(unused)]
fn test_thread_exit_py(py: Python) -> PyResult<()> {
let m = py.import("sys")?;
m.add(py, "exit_thread", py_fn!(py, exit_thread()))?;
py.run(
"
import sys, threading
# should not abort
threading.Thread(target=sys.exit_thread).start()
",
None,
None,
)?;
py.allow_threads(|| {
std::thread::sleep(std::time::Duration::from_millis(100));
});
Ok(())
}
#[rustversion::all(since(1.40), before(1.48), stable)]
#[test]
fn test_thread_exit() {
let gil = Python::acquire_gil();
let py = gil.python();
test_thread_exit_py(py).unwrap();
}
#[allow(unused)]
fn exit_thread(py: Python) -> PyResult<String> {
#[cfg(unix)]
{
py.allow_threads(|| {
unsafe { libc::pthread_exit(std::ptr::null_mut()) };
})
}
#[cfg(not(unix))]
{
let _ = py;
Ok(String::new())
}
}