use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::{FutureExt, Stream};
use pyo3::prelude::*;
mod async_generator;
pub mod asyncio;
mod coroutine;
pub mod sniffio;
pub mod trio;
mod utils;
pub trait PyFuture: Send {
fn poll_py(self: Pin<&mut Self>, py: Python, cx: &mut Context) -> Poll<PyResult<PyObject>>;
}
impl<F, T, E> PyFuture for F
where
F: Future<Output = Result<T, E>> + Send,
T: IntoPy<PyObject> + Send,
E: Send,
PyErr: From<E>,
{
fn poll_py(self: Pin<&mut Self>, py: Python, cx: &mut Context) -> Poll<PyResult<PyObject>> {
let waker = cx.waker();
py.allow_threads(|| Future::poll(self, &mut Context::from_waker(waker)))
.map_ok(|ok| ok.into_py(py))
.map_err(PyErr::from)
}
}
pub trait PyStream: Send {
fn poll_next_py(
self: Pin<&mut Self>,
py: Python,
cx: &mut Context,
) -> Poll<Option<PyResult<PyObject>>>;
}
impl<S, T, E> PyStream for S
where
S: Stream<Item = Result<T, E>> + Send,
T: IntoPy<PyObject> + Send,
E: Send,
PyErr: From<E>,
{
fn poll_next_py(
self: Pin<&mut Self>,
py: Python,
cx: &mut Context,
) -> Poll<Option<PyResult<PyObject>>> {
let waker = cx.waker();
py.allow_threads(|| Stream::poll_next(self, &mut Context::from_waker(waker)))
.map_ok(|ok| ok.into_py(py))
.map_err(PyErr::from)
}
}
#[derive(Debug)]
pub struct FutureWrapper {
future: PyObject,
cancel_on_drop: Option<CancelOnDrop>,
}
#[derive(Debug, Copy, Clone)]
pub enum CancelOnDrop {
IgnoreError,
PanicOnError,
}
impl FutureWrapper {
pub fn new(future: impl Into<PyObject>, cancel_on_drop: Option<CancelOnDrop>) -> Self {
Self {
future: future.into(),
cancel_on_drop,
}
}
pub fn as_mut<'a>(
&'a mut self,
py: Python<'a>,
) -> impl Future<Output = PyResult<PyObject>> + Unpin + 'a {
utils::WithGil { inner: self, py }
}
}
impl<'a> Future for utils::WithGil<'_, &'a mut FutureWrapper> {
type Output = PyResult<PyObject>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self
.inner
.future
.call_method0(self.py, "done")?
.is_true(self.py)?
{
self.inner.cancel_on_drop = None;
return Poll::Ready(self.inner.future.call_method0(self.py, "result"));
}
let callback = utils::WakeCallback(Some(cx.waker().clone()));
self.inner
.future
.call_method1(self.py, "add_done_callback", (callback,))?;
Poll::Pending
}
}
impl Future for FutureWrapper {
type Output = PyResult<PyObject>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Python::with_gil(|gil| Pin::into_inner(self).as_mut(gil).poll_unpin(cx))
}
}
impl Drop for FutureWrapper {
fn drop(&mut self) {
if let Some(cancel) = self.cancel_on_drop {
let res = Python::with_gil(|gil| self.future.call_method0(gil, "cancel"));
if let (Err(err), CancelOnDrop::PanicOnError) = (res, cancel) {
panic!("Cancel error while dropping FutureWrapper: {err:?}");
}
}
}
}
pub type ThrowCallback = Box<dyn FnMut(Python, Option<PyErr>) + Send>;