use pyo3::prelude::*;
#[cfg(feature = "stubs")]
use pyo3_stub_gen::{PyStubType, TypeInfo};
#[cfg(feature = "async-tokio")]
use std::{
ffi::CString,
future::Future,
sync::{Arc, Mutex},
};
use std::{marker::PhantomData, sync::LazyLock};
#[derive(Debug, Clone)]
pub struct Awaitable<'py, T>(pub Bound<'py, PyAny>, PhantomData<T>);
impl<'py, T> Awaitable<'py, T> {
#[must_use]
pub const fn new(obj: Bound<'py, PyAny>) -> Self {
Awaitable(obj, PhantomData)
}
}
impl<'py, T> FromPyObject<'_, 'py> for Awaitable<'py, T> {
type Error = PyErr;
fn extract(obj: Borrowed<'_, 'py, PyAny>) -> Result<Self, Self::Error> {
Ok(Awaitable(obj.to_owned(), PhantomData))
}
}
impl<'py, T> IntoPyObject<'py> for Awaitable<'py, T> {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = std::convert::Infallible;
fn into_pyobject(self, _: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0)
}
}
impl<'a, 'py, T> IntoPyObject<'py> for &'a Awaitable<'py, T> {
type Target = PyAny;
type Output = Borrowed<'a, 'py, Self::Target>;
type Error = std::convert::Infallible;
fn into_pyobject(self, _: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0.as_borrowed())
}
}
impl<'py, T> From<Bound<'py, PyAny>> for Awaitable<'py, T> {
fn from(obj: Bound<'py, PyAny>) -> Self {
Awaitable::new(obj)
}
}
#[cfg(feature = "stubs")]
impl<T> PyStubType for Awaitable<'_, T>
where
T: PyStubType,
{
fn type_output() -> TypeInfo {
let TypeInfo { name, mut import } = T::type_output();
let name = format!("collections.abc.Awaitable[{name}]");
import.insert("collections.abc".into());
TypeInfo { name, import }
}
}
#[cfg(feature = "async-tokio")]
#[macro_export]
macro_rules! py_sync {
($py:ident, $body:expr $(,)?) => {{
$crate::sync::invoke_async_from_py_sync($py, $body)
}};
}
const PY_VARNAME_LOOP: &str = "loop";
const PY_CODE_WORKER_EVENT_LOOP: &str = r#"
import asyncio
import threading
import concurrent.futures
loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]()
def _run_loop() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop_fut.set_result(loop)
loop.run_forever()
_thread = threading.Thread(
target=_run_loop,
name="rigetti-pyo3-worker-loop",
# Daemon threads do not prevent the Python process from exiting.
daemon=True,
)
_thread.start()
loop = loop_fut.result(timeout=1)
"#;
const PY_FUNCNAME_RUN_ON_LOOP: &str = "get_result";
const PY_CODE_RUN_ON_LOOP: &str = r"
import asyncio
def get_result(loop, awaitable):
async def _run_coroutine():
return await awaitable
return asyncio.run_coroutine_threadsafe(
_run_coroutine(),
loop,
).result()
";
static PY_WORKER_EVENT_LOOP: LazyLock<Py<PyAny>> = LazyLock::new(|| {
Python::attach(|py| {
let code = CString::new(PY_CODE_WORKER_EVENT_LOOP).unwrap();
let module_name = CString::new("py_worker_event_loop").unwrap();
let module = PyModule::from_code(py, &code, &module_name, &module_name)
.expect("failed to create worker event loop module");
module
.getattr(PY_VARNAME_LOOP)
.expect("failed to get Python event loop variable from module")
.into()
})
});
#[cfg(feature = "async-tokio")]
pub fn invoke_async_from_py_sync<F, T>(py: ::pyo3::Python<'_>, body: F) -> PyResult<T>
where
F: Future<Output = PyResult<T>> + Send + 'static,
T: Send + Sync + 'static,
{
let result_tx = Arc::new(Mutex::new(None));
let result_rx = Arc::clone(&result_tx);
let coro = ::pyo3_async_runtimes::tokio::future_into_py_with_locals(
py,
::pyo3_async_runtimes::TaskLocals::new(PY_WORKER_EVENT_LOOP.bind(py).to_owned())
.copy_context(py)?,
async move {
let val = body.await?;
if let Ok(mut result) = result_tx.lock() {
*result = Some(val);
}
Ok(())
},
)?;
let code = CString::new(PY_CODE_RUN_ON_LOOP).unwrap();
let module_name = CString::new("py_worker_event_loop_helper").unwrap();
let module = PyModule::from_code(py, &code, &module_name, &module_name)?;
let _ = module
.getattr(PY_FUNCNAME_RUN_ON_LOOP)?
.call((PY_WORKER_EVENT_LOOP.bind(py), &coro), None)?;
let result = result_rx
.lock()
.unwrap()
.take()
.expect("future must always produce either a result or an error");
Ok(result)
}
#[cfg(feature = "async-tokio")]
#[macro_export]
macro_rules! py_async {
($py:ident, $body:expr $(,)?) => {
$crate::pyo3_async_runtimes::tokio::future_into_py($py, $body)
};
}
#[macro_export]
macro_rules! py_function_sync_async {
(
$(#[$meta: meta])+
$pub:vis async fn $name:ident($($(#[$arg_meta: meta])*$arg: ident : $kind: ty),* $(,)?)
$(-> PyResult<$ret: ty>)? $body: block
) => {
$crate::paste! {
async fn [< $name _impl >]($($arg: $kind,)*) $(-> PyResult<$ret>)? {
$body
}
$(#[$meta])+
#[allow(clippy::too_many_arguments)]
#[pyo3(name = $name "")]
$pub fn [< py_ $name >](py: $crate::pyo3::Python<'_> $(, $(#[$arg_meta])*$arg: $kind)*) $(-> PyResult<$ret>)? {
let res = $crate::sync::add_context_if_otel([< $name _impl >]($($arg),*));
$crate::sync::invoke_async_from_py_sync(py, res)
}
}
$crate::py_function_sync_async! {
@async_block {
$(#[$meta])+
$pub async fn $name($($(#[$arg_meta])*$arg : $kind),*) $(-> PyResult<$ret>)? $body
}
}
};
(
@async_block {
$(#[$meta: meta])+
$pub:vis async fn $name:ident($($(#[$arg_meta: meta])*$arg: ident : $kind: ty),* $(,)?) $body: block
}
) => {
$crate::py_function_sync_async! {
@async_block {
$(#[$meta])+
$pub async fn $name($($(#[$arg_meta])*$arg: $kind),*) -> () $body
}
};
};
(
@async_block {
$(#[$meta: meta])+
$pub:vis async fn $name:ident($($(#[$arg_meta: meta])*$arg: ident : $kind: ty),* $(,)?)
-> PyResult<$ret:ty> $body: block
}
) => {
$crate::paste! {
$(#[$meta])+
#[pyo3(name = $name "_async")]
#[allow(clippy::too_many_arguments)]
$pub fn [< py_ $name _async >](py: $crate::pyo3::Python<'_> $(, $(#[$arg_meta])*$arg: $kind)*)
-> ::pyo3::PyResult<$crate::sync::Awaitable<'_, $ret>>
{
let res = $crate::sync::add_context_if_otel([< $name _impl >]($($arg),*));
$crate::pyo3_async_runtimes::tokio::future_into_py(py, res)
.map($crate::sync::Awaitable::new)
}
}
};
}
#[cfg(feature = "opentelemetry")]
pub fn add_context_if_otel<T>(
res: T,
) -> qcs_dependencies_client::opentelemetry::trace::WithContext<T> {
use qcs_dependencies_client::opentelemetry::trace::FutureExt;
res.with_current_context()
}
#[cfg(not(feature = "opentelemetry"))]
#[inline]
pub const fn add_context_if_otel<T>(res: T) -> T {
res
}