use crate::conversion::IntoPyObject;
use crate::err::{self, PyResult};
use crate::internal::state::{AttachGuard, SuspendAttach};
use crate::types::any::PyAnyMethods;
use crate::types::{
PyAny, PyCode, PyCodeMethods, PyDict, PyEllipsis, PyModule, PyNone, PyNotImplemented, PyString,
PyType,
};
use crate::version::PythonVersionInfo;
use crate::{ffi, Bound, Py, PyTypeInfo};
use std::ffi::CStr;
use std::marker::PhantomData;
#[cfg_attr(docsrs, doc(cfg(all())))] #[cfg(not(feature = "nightly"))]
pub unsafe trait Ungil {}
#[cfg_attr(docsrs, doc(cfg(all())))] #[cfg(not(feature = "nightly"))]
unsafe impl<T: Send> Ungil for T {}
#[cfg(feature = "nightly")]
mod nightly {
macro_rules! define {
($($tt:tt)*) => { $($tt)* }
}
define! {
pub unsafe auto trait Ungil {}
}
impl !Ungil for crate::Python<'_> {}
impl !Ungil for crate::PyAny {}
impl<T> !Ungil for crate::PyRef<'_, T> {}
impl<T> !Ungil for crate::PyRefMut<'_, T> {}
impl !Ungil for crate::ffi::PyObject {}
impl !Ungil for crate::ffi::PyLongObject {}
impl !Ungil for crate::ffi::PyThreadState {}
impl !Ungil for crate::ffi::PyInterpreterState {}
impl !Ungil for crate::ffi::PyWeakReference {}
impl !Ungil for crate::ffi::PyFrameObject {}
impl !Ungil for crate::ffi::PyCodeObject {}
#[cfg(not(Py_LIMITED_API))]
impl !Ungil for crate::ffi::PyDictKeysObject {}
#[cfg(not(any(Py_LIMITED_API, Py_3_10)))]
impl !Ungil for crate::ffi::PyArena {}
}
#[cfg(feature = "nightly")]
pub use nightly::Ungil;
#[derive(Copy, Clone)]
pub struct Python<'py>(PhantomData<&'py AttachGuard>, PhantomData<NotSend>);
struct NotSend(PhantomData<*mut Python<'static>>);
impl Python<'_> {
#[cfg_attr(
not(any(PyPy, GraalPy)),
doc = "[`Python::initialize`](crate::marker::Python::initialize)"
)]
#[cfg_attr(PyPy, doc = "`Python::initialize")]
#[inline]
#[track_caller]
pub fn attach<F, R>(f: F) -> R
where
F: for<'py> FnOnce(Python<'py>) -> R,
{
let guard = AttachGuard::attach();
f(guard.python())
}
#[inline]
#[track_caller]
pub fn try_attach<F, R>(f: F) -> Option<R>
where
F: for<'py> FnOnce(Python<'py>) -> R,
{
let guard = AttachGuard::try_attach().ok()?;
Some(f(guard.python()))
}
#[cfg(not(any(PyPy, GraalPy)))]
pub fn initialize() {
crate::interpreter_lifecycle::initialize();
}
#[inline]
#[track_caller]
pub unsafe fn attach_unchecked<F, R>(f: F) -> R
where
F: for<'py> FnOnce(Python<'py>) -> R,
{
let guard = unsafe { AttachGuard::attach_unchecked() };
f(guard.python())
}
}
impl<'py> Python<'py> {
pub fn detach<T, F>(self, f: F) -> T
where
F: Ungil + FnOnce() -> T,
T: Ungil,
{
let _guard = unsafe { SuspendAttach::new() };
f()
}
pub fn eval(
self,
code: &CStr,
globals: Option<&Bound<'py, PyDict>>,
locals: Option<&Bound<'py, PyDict>>,
) -> PyResult<Bound<'py, PyAny>> {
let code = PyCode::compile(self, code, c"<string>", crate::types::PyCodeInput::Eval)?;
code.run(globals, locals)
}
pub fn run(
self,
code: &CStr,
globals: Option<&Bound<'py, PyDict>>,
locals: Option<&Bound<'py, PyDict>>,
) -> PyResult<()> {
let code = PyCode::compile(self, code, c"<string>", crate::types::PyCodeInput::File)?;
code.run(globals, locals).map(|obj| {
debug_assert!(obj.is_none());
})
}
#[inline]
pub fn get_type<T>(self) -> Bound<'py, PyType>
where
T: PyTypeInfo,
{
T::type_object(self)
}
pub fn import<N>(self, name: N) -> PyResult<Bound<'py, PyModule>>
where
N: IntoPyObject<'py, Target = PyString>,
{
PyModule::import(self, name)
}
#[allow(non_snake_case)] #[inline]
pub fn None(self) -> Py<PyAny> {
PyNone::get(self).to_owned().into_any().unbind()
}
#[allow(non_snake_case)] #[inline]
pub fn Ellipsis(self) -> Py<PyAny> {
PyEllipsis::get(self).to_owned().into_any().unbind()
}
#[allow(non_snake_case)] #[inline]
pub fn NotImplemented(self) -> Py<PyAny> {
PyNotImplemented::get(self).to_owned().into_any().unbind()
}
pub fn version(self) -> &'py str {
unsafe {
CStr::from_ptr(ffi::Py_GetVersion())
.to_str()
.expect("Python version string not UTF-8")
}
}
pub fn version_info(self) -> PythonVersionInfo<'py> {
let version_str = self.version();
let version_number_str = version_str.split(' ').next().unwrap_or(version_str);
PythonVersionInfo::from_str(version_number_str).unwrap()
}
pub fn check_signals(self) -> PyResult<()> {
err::error_on_minusone(self, unsafe { ffi::PyErr_CheckSignals() })
}
}
impl<'unbound> Python<'unbound> {
#[inline]
pub unsafe fn assume_attached() -> Python<'unbound> {
Python(PhantomData, PhantomData)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
internal::state::ForbidAttaching,
types::{IntoPyDict, PyList},
};
#[test]
fn test_eval() {
Python::attach(|py| {
let v: i32 = py
.eval(c"min(1, 2)", None, None)
.map_err(|e| e.display(py))
.unwrap()
.extract()
.unwrap();
assert_eq!(v, 1);
let d = [("foo", 13)].into_py_dict(py).unwrap();
let v: i32 = py
.eval(c"foo + 29", Some(&d), None)
.unwrap()
.extract()
.unwrap();
assert_eq!(v, 42);
let v: i32 = py
.eval(c"foo + 29", None, Some(&d))
.unwrap()
.extract()
.unwrap();
assert_eq!(v, 42);
let v: i32 = py
.eval(c"min(foo, 2)", None, Some(&d))
.unwrap()
.extract()
.unwrap();
assert_eq!(v, 2);
});
}
#[test]
#[cfg(not(target_arch = "wasm32"))] fn test_detach_releases_and_acquires_gil() {
Python::attach(|py| {
let b = std::sync::Arc::new(std::sync::Barrier::new(2));
let b2 = b.clone();
std::thread::spawn(move || Python::attach(|_| b2.wait()));
py.detach(|| {
b.wait();
});
unsafe {
let tstate = ffi::PyEval_SaveThread();
ffi::PyEval_RestoreThread(tstate);
}
});
}
#[test]
#[cfg(panic = "unwind")]
fn test_detach_panics_safely() {
Python::attach(|py| {
let result = std::panic::catch_unwind(|| unsafe {
let py = Python::assume_attached();
py.detach(|| {
panic!("There was a panic!");
});
});
assert!(result.is_err());
let list = PyList::new(py, [1, 2, 3, 4]).unwrap();
assert_eq!(list.extract::<Vec<i32>>().unwrap(), vec![1, 2, 3, 4]);
});
}
#[cfg(not(pyo3_disable_reference_pool))]
#[test]
fn test_detach_pass_stuff_in() {
let list = Python::attach(|py| PyList::new(py, vec!["foo", "bar"]).unwrap().unbind());
let mut v = vec![1, 2, 3];
let a = std::sync::Arc::new(String::from("foo"));
Python::attach(|py| {
py.detach(|| {
drop((list, &mut v, a));
});
});
}
#[test]
#[cfg(not(Py_LIMITED_API))]
fn test_acquire_gil() {
use std::ffi::c_int;
const GIL_NOT_HELD: c_int = 0;
const GIL_HELD: c_int = 1;
#[cfg(not(any(PyPy, GraalPy)))]
Python::initialize();
let state = unsafe { crate::ffi::PyGILState_Check() };
assert_eq!(state, GIL_NOT_HELD);
Python::attach(|_| {
let state = unsafe { crate::ffi::PyGILState_Check() };
assert_eq!(state, GIL_HELD);
});
let state = unsafe { crate::ffi::PyGILState_Check() };
assert_eq!(state, GIL_NOT_HELD);
}
#[test]
fn test_ellipsis() {
Python::attach(|py| {
assert_eq!(py.Ellipsis().to_string(), "Ellipsis");
let v = py
.eval(c"...", None, None)
.map_err(|e| e.display(py))
.unwrap();
assert!(v.eq(py.Ellipsis()).unwrap());
});
}
#[test]
fn test_py_run_inserts_globals() {
use crate::types::dict::PyDictMethods;
Python::attach(|py| {
let namespace = PyDict::new(py);
py.run(
c"class Foo: pass\na = int(3)",
Some(&namespace),
Some(&namespace),
)
.unwrap();
assert!(matches!(namespace.get_item("Foo"), Ok(Some(..))));
assert!(matches!(namespace.get_item("a"), Ok(Some(..))));
#[cfg(not(Py_3_10))]
assert!(matches!(namespace.get_item("__builtins__"), Ok(Some(..))));
})
}
#[cfg(feature = "macros")]
#[test]
fn test_py_run_inserts_globals_2() {
use std::ffi::CString;
#[crate::pyclass(crate = "crate", skip_from_py_object)]
#[derive(Clone)]
struct CodeRunner {
code: CString,
}
impl CodeRunner {
fn reproducer(&mut self, py: Python<'_>) -> PyResult<()> {
let variables = PyDict::new(py);
variables.set_item("cls", crate::Py::new(py, self.clone())?)?;
py.run(self.code.as_c_str(), Some(&variables), None)?;
Ok(())
}
}
#[crate::pymethods(crate = "crate")]
impl CodeRunner {
fn func(&mut self, py: Python<'_>) -> PyResult<()> {
py.import("math")?;
Ok(())
}
}
let mut runner = CodeRunner {
code: CString::new(
r#"
cls.func()
"#
.to_string(),
)
.unwrap(),
};
Python::attach(|py| {
runner.reproducer(py).unwrap();
});
}
#[test]
fn python_is_zst() {
assert_eq!(std::mem::size_of::<Python<'_>>(), 0);
}
#[test]
fn test_try_attach_fail_during_gc() {
Python::attach(|_| {
assert!(Python::try_attach(|_| {}).is_some());
let guard = ForbidAttaching::during_traverse();
assert!(Python::try_attach(|_| {}).is_none());
drop(guard);
assert!(Python::try_attach(|_| {}).is_some());
})
}
#[test]
fn test_try_attach_ok_when_detached() {
Python::attach(|py| {
py.detach(|| {
assert!(Python::try_attach(|_| {}).is_some());
});
});
}
}