pyo3_async_runtimes/
generic.rs

1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! > are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17    future::Future,
18    pin::Pin,
19    sync::{Arc, Mutex},
20    task::{Context, Poll},
21};
22
23use crate::{
24    asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
25    get_running_loop, into_future_with_locals, TaskLocals,
26};
27use futures::channel::oneshot;
28#[cfg(feature = "unstable-streams")]
29use futures::{channel::mpsc, SinkExt};
30use pin_project_lite::pin_project;
31use pyo3::prelude::*;
32use pyo3::IntoPyObjectExt;
33#[cfg(feature = "unstable-streams")]
34use std::marker::PhantomData;
35
36/// Generic utilities for a JoinError
37pub trait JoinError {
38    /// Check if the spawned task exited because of a panic
39    fn is_panic(&self) -> bool;
40    /// Get the panic object associated with the error.  Panics if `is_panic` is not true.
41    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
42}
43
44/// Generic Rust async/await runtime
45pub trait Runtime: Send + 'static {
46    /// The error returned by a JoinHandle after being awaited
47    type JoinError: JoinError + Send;
48    /// A future that completes with the result of the spawned task
49    type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
50
51    /// Spawn a future onto this runtime's event loop
52    fn spawn<F>(fut: F) -> Self::JoinHandle
53    where
54        F: Future<Output = ()> + Send + 'static;
55}
56
57/// Extension trait for async/await runtimes that support spawning local tasks
58pub trait SpawnLocalExt: Runtime {
59    /// Spawn a !Send future onto this runtime's event loop
60    fn spawn_local<F>(fut: F) -> Self::JoinHandle
61    where
62        F: Future<Output = ()> + 'static;
63}
64
65/// Exposes the utilities necessary for using task-local data in the Runtime
66pub trait ContextExt: Runtime {
67    /// Set the task locals for the given future
68    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
69    where
70        F: Future<Output = R> + Send + 'static;
71
72    /// Get the task locals for the current task
73    fn get_task_locals() -> Option<TaskLocals>;
74}
75
76/// Adds the ability to scope task-local data for !Send futures
77pub trait LocalContextExt: Runtime {
78    /// Set the task locals for the given !Send future
79    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
80    where
81        F: Future<Output = R> + 'static;
82}
83
84/// Get the current event loop from either Python or Rust async task local context
85///
86/// This function first checks if the runtime has a task-local reference to the Python event loop.
87/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
88/// with the current OS thread.
89pub fn get_current_loop<R>(py: Python) -> PyResult<Bound<PyAny>>
90where
91    R: ContextExt,
92{
93    if let Some(locals) = R::get_task_locals() {
94        Ok(locals.event_loop.into_bound(py))
95    } else {
96        get_running_loop(py)
97    }
98}
99
100/// Either copy the task locals from the current task OR get the current running loop and
101/// contextvars from Python.
102pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
103where
104    R: ContextExt,
105{
106    if let Some(locals) = R::get_task_locals() {
107        Ok(locals)
108    } else {
109        Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
110    }
111}
112
113/// Run the event loop until the given Future completes
114///
115/// After this function returns, the event loop can be resumed with [`run_until_complete`]
116///
117/// # Arguments
118/// * `event_loop` - The Python event loop that should run the future
119/// * `fut` - The future to drive to completion
120///
121/// # Examples
122///
123/// ```no_run
124/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
125/// #
126/// # use pyo3_async_runtimes::{
127/// #     TaskLocals,
128/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
129/// # };
130/// #
131/// # struct MyCustomJoinError;
132/// #
133/// # impl JoinError for MyCustomJoinError {
134/// #     fn is_panic(&self) -> bool {
135/// #         unreachable!()
136/// #     }
137/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
138/// #         unreachable!()
139/// #     }
140/// # }
141/// #
142/// # struct MyCustomJoinHandle;
143/// #
144/// # impl Future for MyCustomJoinHandle {
145/// #     type Output = Result<(), MyCustomJoinError>;
146/// #
147/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
148/// #         unreachable!()
149/// #     }
150/// # }
151/// #
152/// # struct MyCustomRuntime;
153/// #
154/// # impl Runtime for MyCustomRuntime {
155/// #     type JoinError = MyCustomJoinError;
156/// #     type JoinHandle = MyCustomJoinHandle;
157/// #
158/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
159/// #     where
160/// #         F: Future<Output = ()> + Send + 'static
161/// #     {
162/// #         unreachable!()
163/// #     }
164/// # }
165/// #
166/// # impl ContextExt for MyCustomRuntime {
167/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
168/// #     where
169/// #         F: Future<Output = R> + Send + 'static
170/// #     {
171/// #         unreachable!()
172/// #     }
173/// #     fn get_task_locals() -> Option<TaskLocals> {
174/// #         unreachable!()
175/// #     }
176/// # }
177/// #
178/// # use std::time::Duration;
179/// #
180/// # use pyo3::prelude::*;
181/// #
182/// # Python::attach(|py| -> PyResult<()> {
183/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
184/// # #[cfg(feature = "tokio-runtime")]
185/// pyo3_async_runtimes::generic::run_until_complete::<MyCustomRuntime, _, _>(&event_loop, async move {
186///     tokio::time::sleep(Duration::from_secs(1)).await;
187///     Ok(())
188/// })?;
189/// # Ok(())
190/// # }).unwrap();
191/// ```
192pub fn run_until_complete<R, F, T>(event_loop: &Bound<PyAny>, fut: F) -> PyResult<T>
193where
194    R: Runtime + ContextExt,
195    F: Future<Output = PyResult<T>> + Send + 'static,
196    T: Send + Sync + 'static,
197{
198    let py = event_loop.py();
199    let result_tx = Arc::new(Mutex::new(None));
200    let result_rx = Arc::clone(&result_tx);
201    let coro = future_into_py_with_locals::<R, _, ()>(
202        py,
203        TaskLocals::new(event_loop.clone()).copy_context(py)?,
204        async move {
205            let val = fut.await?;
206            if let Ok(mut result) = result_tx.lock() {
207                *result = Some(val);
208            }
209            Ok(())
210        },
211    )?;
212
213    event_loop.call_method1("run_until_complete", (coro,))?;
214
215    let result = result_rx.lock().unwrap().take().unwrap();
216    Ok(result)
217}
218
219/// Run the event loop until the given Future completes
220///
221/// # Arguments
222/// * `py` - The current PyO3 GIL guard
223/// * `fut` - The future to drive to completion
224///
225/// # Examples
226///
227/// ```no_run
228/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
229/// #
230/// # use pyo3_async_runtimes::{
231/// #     TaskLocals,
232/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
233/// # };
234/// #
235/// # struct MyCustomJoinError;
236/// #
237/// # impl JoinError for MyCustomJoinError {
238/// #     fn is_panic(&self) -> bool {
239/// #         unreachable!()
240/// #     }
241/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
242/// #         unreachable!()
243/// #     }
244/// # }
245/// #
246/// # struct MyCustomJoinHandle;
247/// #
248/// # impl Future for MyCustomJoinHandle {
249/// #     type Output = Result<(), MyCustomJoinError>;
250/// #
251/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
252/// #         unreachable!()
253/// #     }
254/// # }
255/// #
256/// # struct MyCustomRuntime;
257/// #
258/// # impl Runtime for MyCustomRuntime {
259/// #     type JoinError = MyCustomJoinError;
260/// #     type JoinHandle = MyCustomJoinHandle;
261/// #
262/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
263/// #     where
264/// #         F: Future<Output = ()> + Send + 'static
265/// #     {
266/// #         unreachable!()
267/// #     }
268/// # }
269/// #
270/// # impl ContextExt for MyCustomRuntime {
271/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
272/// #     where
273/// #         F: Future<Output = R> + Send + 'static
274/// #     {
275/// #         unreachable!()
276/// #     }
277/// #     fn get_task_locals() -> Option<TaskLocals> {
278/// #         unreachable!()
279/// #     }
280/// # }
281/// #
282/// # use std::time::Duration;
283/// # async fn custom_sleep(_duration: Duration) { }
284/// #
285/// # use pyo3::prelude::*;
286/// #
287/// fn main() {
288///     Python::attach(|py| {
289///         pyo3_async_runtimes::generic::run::<MyCustomRuntime, _, _>(py, async move {
290///             custom_sleep(Duration::from_secs(1)).await;
291///             Ok(())
292///         })
293///         .map_err(|e| {
294///             e.print_and_set_sys_last_vars(py);
295///         })
296///         .unwrap();
297///     })
298/// }
299/// ```
300pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
301where
302    R: Runtime + ContextExt,
303    F: Future<Output = PyResult<T>> + Send + 'static,
304    T: Send + Sync + 'static,
305{
306    let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
307
308    let result = run_until_complete::<R, F, T>(&event_loop, fut);
309
310    close(event_loop)?;
311
312    result
313}
314
315fn cancelled(future: &Bound<PyAny>) -> PyResult<bool> {
316    future.getattr("cancelled")?.call0()?.is_truthy()
317}
318
319#[pyclass]
320struct CheckedCompletor;
321
322#[pymethods]
323impl CheckedCompletor {
324    fn __call__(
325        &self,
326        future: &Bound<PyAny>,
327        complete: &Bound<PyAny>,
328        value: &Bound<PyAny>,
329    ) -> PyResult<()> {
330        if cancelled(future)? {
331            return Ok(());
332        }
333
334        complete.call1((value,))?;
335
336        Ok(())
337    }
338}
339
340fn set_result(
341    event_loop: &Bound<PyAny>,
342    future: &Bound<PyAny>,
343    result: PyResult<Py<PyAny>>,
344) -> PyResult<()> {
345    let py = event_loop.py();
346    let none = py.None().into_bound(py);
347
348    let (complete, val) = match result {
349        Ok(val) => (future.getattr("set_result")?, val.into_pyobject(py)?),
350        Err(err) => (future.getattr("set_exception")?, err.into_bound_py_any(py)?),
351    };
352    call_soon_threadsafe(event_loop, &none, (CheckedCompletor, future, complete, val))?;
353
354    Ok(())
355}
356
357/// Convert a Python `awaitable` into a Rust Future
358///
359/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
360/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
361/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
362///
363/// # Arguments
364/// * `awaitable` - The Python `awaitable` to be converted
365///
366/// # Examples
367///
368/// ```no_run
369/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
370/// # use std::ffi::CString;
371/// #
372/// # use pyo3::prelude::*;
373/// #
374/// # use pyo3_async_runtimes::{
375/// #     TaskLocals,
376/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
377/// # };
378/// #
379/// # struct MyCustomJoinError;
380/// #
381/// # impl JoinError for MyCustomJoinError {
382/// #     fn is_panic(&self) -> bool {
383/// #         unreachable!()
384/// #     }
385/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
386/// #         unreachable!()
387/// #     }
388/// # }
389/// #
390/// # struct MyCustomJoinHandle;
391/// #
392/// # impl Future for MyCustomJoinHandle {
393/// #     type Output = Result<(), MyCustomJoinError>;
394/// #
395/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
396/// #         unreachable!()
397/// #     }
398/// # }
399/// #
400/// # struct MyCustomRuntime;
401/// #
402/// # impl MyCustomRuntime {
403/// #     async fn sleep(_: Duration) {
404/// #         unreachable!()
405/// #     }
406/// # }
407/// #
408/// # impl Runtime for MyCustomRuntime {
409/// #     type JoinError = MyCustomJoinError;
410/// #     type JoinHandle = MyCustomJoinHandle;
411/// #
412/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
413/// #     where
414/// #         F: Future<Output = ()> + Send + 'static
415/// #     {
416/// #         unreachable!()
417/// #     }
418/// # }
419/// #
420/// # impl ContextExt for MyCustomRuntime {
421/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
422/// #     where
423/// #         F: Future<Output = R> + Send + 'static
424/// #     {
425/// #         unreachable!()
426/// #     }
427/// #     fn get_task_locals() -> Option<TaskLocals> {
428/// #         unreachable!()
429/// #     }
430/// # }
431/// #
432/// const PYTHON_CODE: &'static str = r#"
433/// import asyncio
434///
435/// async def py_sleep(duration):
436///     await asyncio.sleep(duration)
437/// "#;
438///
439/// async fn py_sleep(seconds: f32) -> PyResult<()> {
440///     let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
441///         Ok(
442///             PyModule::from_code(
443///                 py,
444///                 &CString::new(PYTHON_CODE).unwrap(),
445///                 &CString::new("test_into_future/test_mod.py").unwrap(),
446///                 &CString::new("test_mod").unwrap(),
447///             )?
448///             .into()
449///         )
450///     })?;
451///
452///     Python::attach(|py| {
453///         pyo3_async_runtimes::generic::into_future::<MyCustomRuntime>(
454///             test_mod
455///                 .call_method1(py, "py_sleep", (seconds,))?
456///                 .into_bound(py),
457///         )
458///     })?
459///     .await?;
460///     Ok(())
461/// }
462/// ```
463pub fn into_future<R>(
464    awaitable: Bound<PyAny>,
465) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send>
466where
467    R: Runtime + ContextExt,
468{
469    into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
470}
471
472/// Convert a Rust Future into a Python awaitable with a generic runtime
473///
474/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
475/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
476///
477/// Python `contextvars` are preserved when calling async Python functions within the Rust future
478/// via [`into_future`] (new behaviour in `v0.15`).
479///
480/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
481/// > unfortunately fail to resolve them when called within the Rust future. This is because the
482/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
483/// >
484/// > As a workaround, you can get the `contextvars` from the current task locals using
485/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
486/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
487/// > synchronous function, and restore the previous context when it returns or raises an exception.
488///
489/// # Arguments
490/// * `py` - PyO3 GIL guard
491/// * `locals` - The task-local data for Python
492/// * `fut` - The Rust future to be converted
493///
494/// # Examples
495///
496/// ```no_run
497/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
498/// #
499/// # use pyo3_async_runtimes::{
500/// #     TaskLocals,
501/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
502/// # };
503/// #
504/// # struct MyCustomJoinError;
505/// #
506/// # impl JoinError for MyCustomJoinError {
507/// #     fn is_panic(&self) -> bool {
508/// #         unreachable!()
509/// #     }
510/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
511/// #         unreachable!()
512/// #     }
513/// # }
514/// #
515/// # struct MyCustomJoinHandle;
516/// #
517/// # impl Future for MyCustomJoinHandle {
518/// #     type Output = Result<(), MyCustomJoinError>;
519/// #
520/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
521/// #         unreachable!()
522/// #     }
523/// # }
524/// #
525/// # struct MyCustomRuntime;
526/// #
527/// # impl MyCustomRuntime {
528/// #     async fn sleep(_: Duration) {
529/// #         unreachable!()
530/// #     }
531/// # }
532/// #
533/// # impl Runtime for MyCustomRuntime {
534/// #     type JoinError = MyCustomJoinError;
535/// #     type JoinHandle = MyCustomJoinHandle;
536/// #
537/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
538/// #     where
539/// #         F: Future<Output = ()> + Send + 'static
540/// #     {
541/// #         unreachable!()
542/// #     }
543/// # }
544/// #
545/// # impl ContextExt for MyCustomRuntime {
546/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
547/// #     where
548/// #         F: Future<Output = R> + Send + 'static
549/// #     {
550/// #         unreachable!()
551/// #     }
552/// #     fn get_task_locals() -> Option<TaskLocals> {
553/// #         unreachable!()
554/// #     }
555/// # }
556/// #
557/// use std::time::Duration;
558///
559/// use pyo3::prelude::*;
560///
561/// /// Awaitable sleep function
562/// #[pyfunction]
563/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
564///     let secs = secs.extract()?;
565///     pyo3_async_runtimes::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
566///         py,
567///         pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
568///         async move {
569///             MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
570///             Ok(())
571///         }
572///     )
573/// }
574/// ```
575#[allow(unused_must_use)]
576pub fn future_into_py_with_locals<R, F, T>(
577    py: Python,
578    locals: TaskLocals,
579    fut: F,
580) -> PyResult<Bound<PyAny>>
581where
582    R: Runtime + ContextExt,
583    F: Future<Output = PyResult<T>> + Send + 'static,
584    T: for<'py> IntoPyObject<'py>,
585{
586    let (cancel_tx, cancel_rx) = oneshot::channel();
587
588    let py_fut = create_future(locals.event_loop.bind(py).clone())?;
589    py_fut.call_method1(
590        "add_done_callback",
591        (PyDoneCallback {
592            cancel_tx: Some(cancel_tx),
593        },),
594    )?;
595
596    let future_tx1: Py<PyAny> = py_fut.clone().into();
597    let future_tx2 = future_tx1.clone_ref(py);
598
599    R::spawn(async move {
600        let locals2 = Python::attach(|py| locals.clone_ref(py));
601
602        if let Err(e) = R::spawn(async move {
603            let result = R::scope(
604                Python::attach(|py| locals2.clone_ref(py)),
605                Cancellable::new_with_cancel_rx(fut, cancel_rx),
606            )
607            .await;
608
609            Python::attach(move |py| {
610                if cancelled(future_tx1.bind(py))
611                    .map_err(dump_err(py))
612                    .unwrap_or(false)
613                {
614                    return;
615                }
616
617                let _ = set_result(
618                    &locals2.event_loop(py),
619                    future_tx1.bind(py),
620                    result.and_then(|val| val.into_py_any(py)),
621                )
622                .map_err(dump_err(py));
623            });
624        })
625        .await
626        {
627            if e.is_panic() {
628                Python::attach(move |py| {
629                    if cancelled(future_tx2.bind(py))
630                        .map_err(dump_err(py))
631                        .unwrap_or(false)
632                    {
633                        return;
634                    }
635
636                    let panic_message = format!(
637                        "rust future panicked: {}",
638                        get_panic_message(&e.into_panic())
639                    );
640                    let _ = set_result(
641                        locals.event_loop.bind(py),
642                        future_tx2.bind(py),
643                        Err(RustPanic::new_err(panic_message)),
644                    )
645                    .map_err(dump_err(py));
646                });
647            }
648        }
649    });
650
651    Ok(py_fut)
652}
653
654fn get_panic_message(any: &dyn std::any::Any) -> &str {
655    if let Some(str_slice) = any.downcast_ref::<&str>() {
656        str_slice
657    } else if let Some(string) = any.downcast_ref::<String>() {
658        string.as_str()
659    } else {
660        "unknown error"
661    }
662}
663
664pin_project! {
665    /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
666    #[must_use = "futures do nothing unless you `.await` or poll them"]
667    #[derive(Debug)]
668    struct Cancellable<T> {
669        #[pin]
670        future: T,
671        #[pin]
672        cancel_rx: oneshot::Receiver<()>,
673
674        poll_cancel_rx: bool
675    }
676}
677
678impl<T> Cancellable<T> {
679    fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
680        Self {
681            future,
682            cancel_rx,
683
684            poll_cancel_rx: true,
685        }
686    }
687}
688
689impl<'py, F, T> Future for Cancellable<F>
690where
691    F: Future<Output = PyResult<T>>,
692    T: IntoPyObject<'py>,
693{
694    type Output = F::Output;
695
696    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
697        let this = self.project();
698
699        // First, try polling the future
700        if let Poll::Ready(v) = this.future.poll(cx) {
701            return Poll::Ready(v);
702        }
703
704        // Now check for cancellation
705        if *this.poll_cancel_rx {
706            match this.cancel_rx.poll(cx) {
707                Poll::Ready(Ok(())) => {
708                    *this.poll_cancel_rx = false;
709                    // The python future has already been cancelled, so this return value will never
710                    // be used.
711                    Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
712                        "unreachable",
713                    )))
714                }
715                Poll::Ready(Err(_)) => {
716                    *this.poll_cancel_rx = false;
717                    Poll::Pending
718                }
719                Poll::Pending => Poll::Pending,
720            }
721        } else {
722            Poll::Pending
723        }
724    }
725}
726
727#[pyclass]
728struct PyDoneCallback {
729    cancel_tx: Option<oneshot::Sender<()>>,
730}
731
732#[pymethods]
733impl PyDoneCallback {
734    pub fn __call__(&mut self, fut: &Bound<PyAny>) -> PyResult<()> {
735        let py = fut.py();
736
737        if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
738            let _ = self.cancel_tx.take().unwrap().send(());
739        }
740
741        Ok(())
742    }
743}
744
745/// Convert a Rust Future into a Python awaitable with a generic runtime
746///
747/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
748/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
749///
750/// Python `contextvars` are preserved when calling async Python functions within the Rust future
751/// via [`into_future`] (new behaviour in `v0.15`).
752///
753/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
754/// > unfortunately fail to resolve them when called within the Rust future. This is because the
755/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
756/// >
757/// > As a workaround, you can get the `contextvars` from the current task locals using
758/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
759/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
760/// > synchronous function, and restore the previous context when it returns or raises an exception.
761///
762/// # Arguments
763/// * `py` - The current PyO3 GIL guard
764/// * `fut` - The Rust future to be converted
765///
766/// # Examples
767///
768/// ```no_run
769/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
770/// #
771/// # use pyo3_async_runtimes::{
772/// #     TaskLocals,
773/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
774/// # };
775/// #
776/// # struct MyCustomJoinError;
777/// #
778/// # impl JoinError for MyCustomJoinError {
779/// #     fn is_panic(&self) -> bool {
780/// #         unreachable!()
781/// #     }
782/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
783/// #         unreachable!()
784/// #     }
785/// # }
786/// #
787/// # struct MyCustomJoinHandle;
788/// #
789/// # impl Future for MyCustomJoinHandle {
790/// #     type Output = Result<(), MyCustomJoinError>;
791/// #
792/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
793/// #         unreachable!()
794/// #     }
795/// # }
796/// #
797/// # struct MyCustomRuntime;
798/// #
799/// # impl MyCustomRuntime {
800/// #     async fn sleep(_: Duration) {
801/// #         unreachable!()
802/// #     }
803/// # }
804/// #
805/// # impl Runtime for MyCustomRuntime {
806/// #     type JoinError = MyCustomJoinError;
807/// #     type JoinHandle = MyCustomJoinHandle;
808/// #
809/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
810/// #     where
811/// #         F: Future<Output = ()> + Send + 'static
812/// #     {
813/// #         unreachable!()
814/// #     }
815/// # }
816/// #
817/// # impl ContextExt for MyCustomRuntime {
818/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
819/// #     where
820/// #         F: Future<Output = R> + Send + 'static
821/// #     {
822/// #         unreachable!()
823/// #     }
824/// #     fn get_task_locals() -> Option<TaskLocals> {
825/// #         unreachable!()
826/// #     }
827/// # }
828/// #
829/// use std::time::Duration;
830///
831/// use pyo3::prelude::*;
832///
833/// /// Awaitable sleep function
834/// #[pyfunction]
835/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
836///     let secs = secs.extract()?;
837///     pyo3_async_runtimes::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
838///         MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
839///         Ok(())
840///     })
841/// }
842/// ```
843pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
844where
845    R: Runtime + ContextExt,
846    F: Future<Output = PyResult<T>> + Send + 'static,
847    T: for<'py> IntoPyObject<'py>,
848{
849    future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
850}
851
852/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
853/// specification of task locals.
854///
855/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
856/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
857///
858/// Python `contextvars` are preserved when calling async Python functions within the Rust future
859/// via [`into_future`] (new behaviour in `v0.15`).
860///
861/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
862/// > unfortunately fail to resolve them when called within the Rust future. This is because the
863/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
864/// >
865/// > As a workaround, you can get the `contextvars` from the current task locals using
866/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
867/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
868/// > synchronous function, and restore the previous context when it returns or raises an exception.
869///
870/// # Arguments
871/// * `py` - PyO3 GIL guard
872/// * `locals` - The task locals for the future
873/// * `fut` - The Rust future to be converted
874///
875/// # Examples
876///
877/// ```no_run
878/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
879/// #
880/// # use pyo3_async_runtimes::{
881/// #     TaskLocals,
882/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
883/// # };
884/// #
885/// # struct MyCustomJoinError;
886/// #
887/// # impl JoinError for MyCustomJoinError {
888/// #     fn is_panic(&self) -> bool {
889/// #         unreachable!()
890/// #     }
891/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
892/// #         unreachable!()
893/// #     }
894/// # }
895/// #
896/// # struct MyCustomJoinHandle;
897/// #
898/// # impl Future for MyCustomJoinHandle {
899/// #     type Output = Result<(), MyCustomJoinError>;
900/// #
901/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
902/// #         unreachable!()
903/// #     }
904/// # }
905/// #
906/// # struct MyCustomRuntime;
907/// #
908/// # impl MyCustomRuntime {
909/// #     async fn sleep(_: Duration) {
910/// #         unreachable!()
911/// #     }
912/// # }
913/// #
914/// # impl Runtime for MyCustomRuntime {
915/// #     type JoinError = MyCustomJoinError;
916/// #     type JoinHandle = MyCustomJoinHandle;
917/// #
918/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
919/// #     where
920/// #         F: Future<Output = ()> + Send + 'static
921/// #     {
922/// #         unreachable!()
923/// #     }
924/// # }
925/// #
926/// # impl ContextExt for MyCustomRuntime {
927/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
928/// #     where
929/// #         F: Future<Output = R> + Send + 'static
930/// #     {
931/// #         unreachable!()
932/// #     }
933/// #     fn get_task_locals() -> Option<TaskLocals> {
934/// #         unreachable!()
935/// #     }
936/// # }
937/// #
938/// # impl SpawnLocalExt for MyCustomRuntime {
939/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
940/// #     where
941/// #         F: Future<Output = ()> + 'static
942/// #     {
943/// #         unreachable!()
944/// #     }
945/// # }
946/// #
947/// # impl LocalContextExt for MyCustomRuntime {
948/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
949/// #     where
950/// #         F: Future<Output = R> + 'static
951/// #     {
952/// #         unreachable!()
953/// #     }
954/// # }
955/// #
956/// use std::{rc::Rc, time::Duration};
957///
958/// use pyo3::prelude::*;
959///
960/// /// Awaitable sleep function
961/// #[pyfunction]
962/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
963///     // Rc is !Send so it cannot be passed into pyo3_async_runtimes::generic::future_into_py
964///     let secs = Rc::new(secs);
965///
966///     pyo3_async_runtimes::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
967///         py,
968///         pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
969///         async move {
970///             MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
971///             Ok(())
972///         }
973///     )
974/// }
975/// ```
976#[deprecated(
977    since = "0.18.0",
978    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
979)]
980#[allow(unused_must_use)]
981pub fn local_future_into_py_with_locals<R, F, T>(
982    py: Python,
983    locals: TaskLocals,
984    fut: F,
985) -> PyResult<Bound<PyAny>>
986where
987    R: Runtime + SpawnLocalExt + LocalContextExt,
988    F: Future<Output = PyResult<T>> + 'static,
989    T: for<'py> IntoPyObject<'py>,
990{
991    let (cancel_tx, cancel_rx) = oneshot::channel();
992
993    let py_fut = create_future(locals.event_loop.clone_ref(py).into_bound(py))?;
994    py_fut.call_method1(
995        "add_done_callback",
996        (PyDoneCallback {
997            cancel_tx: Some(cancel_tx),
998        },),
999    )?;
1000
1001    let future_tx1: Py<PyAny> = py_fut.clone().into();
1002    let future_tx2 = future_tx1.clone_ref(py);
1003
1004    R::spawn_local(async move {
1005        let locals2 = Python::attach(|py| locals.clone_ref(py));
1006
1007        if let Err(e) = R::spawn_local(async move {
1008            let result = R::scope_local(
1009                Python::attach(|py| locals2.clone_ref(py)),
1010                Cancellable::new_with_cancel_rx(fut, cancel_rx),
1011            )
1012            .await;
1013
1014            Python::attach(move |py| {
1015                if cancelled(future_tx1.bind(py))
1016                    .map_err(dump_err(py))
1017                    .unwrap_or(false)
1018                {
1019                    return;
1020                }
1021
1022                let _ = set_result(
1023                    locals2.event_loop.bind(py),
1024                    future_tx1.bind(py),
1025                    result.and_then(|val| val.into_py_any(py)),
1026                )
1027                .map_err(dump_err(py));
1028            });
1029        })
1030        .await
1031        {
1032            if e.is_panic() {
1033                Python::attach(move |py| {
1034                    if cancelled(future_tx2.bind(py))
1035                        .map_err(dump_err(py))
1036                        .unwrap_or(false)
1037                    {
1038                        return;
1039                    }
1040
1041                    let panic_message = format!(
1042                        "rust future panicked: {}",
1043                        get_panic_message(&e.into_panic())
1044                    );
1045                    let _ = set_result(
1046                        locals.event_loop.bind(py),
1047                        future_tx2.bind(py),
1048                        Err(RustPanic::new_err(panic_message)),
1049                    )
1050                    .map_err(dump_err(py));
1051                });
1052            }
1053        }
1054    });
1055
1056    Ok(py_fut)
1057}
1058
1059/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1060///
1061/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1062/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1063///
1064/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1065/// via [`into_future`] (new behaviour in `v0.15`).
1066///
1067/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1068/// > unfortunately fail to resolve them when called within the Rust future. This is because the
1069/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
1070/// >
1071/// > As a workaround, you can get the `contextvars` from the current task locals using
1072/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1073/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1074/// > synchronous function, and restore the previous context when it returns or raises an exception.
1075///
1076/// # Arguments
1077/// * `py` - The current PyO3 GIL guard
1078/// * `fut` - The Rust future to be converted
1079///
1080/// # Examples
1081///
1082/// ```no_run
1083/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1084/// #
1085/// # use pyo3_async_runtimes::{
1086/// #     TaskLocals,
1087/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1088/// # };
1089/// #
1090/// # struct MyCustomJoinError;
1091/// #
1092/// # impl JoinError for MyCustomJoinError {
1093/// #     fn is_panic(&self) -> bool {
1094/// #         unreachable!()
1095/// #     }
1096/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1097/// #         unreachable!()
1098/// #     }
1099/// # }
1100/// #
1101/// # struct MyCustomJoinHandle;
1102/// #
1103/// # impl Future for MyCustomJoinHandle {
1104/// #     type Output = Result<(), MyCustomJoinError>;
1105/// #
1106/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1107/// #         unreachable!()
1108/// #     }
1109/// # }
1110/// #
1111/// # struct MyCustomRuntime;
1112/// #
1113/// # impl MyCustomRuntime {
1114/// #     async fn sleep(_: Duration) {
1115/// #         unreachable!()
1116/// #     }
1117/// # }
1118/// #
1119/// # impl Runtime for MyCustomRuntime {
1120/// #     type JoinError = MyCustomJoinError;
1121/// #     type JoinHandle = MyCustomJoinHandle;
1122/// #
1123/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1124/// #     where
1125/// #         F: Future<Output = ()> + Send + 'static
1126/// #     {
1127/// #         unreachable!()
1128/// #     }
1129/// # }
1130/// #
1131/// # impl ContextExt for MyCustomRuntime {
1132/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1133/// #     where
1134/// #         F: Future<Output = R> + Send + 'static
1135/// #     {
1136/// #         unreachable!()
1137/// #     }
1138/// #     fn get_task_locals() -> Option<TaskLocals> {
1139/// #         unreachable!()
1140/// #     }
1141/// # }
1142/// #
1143/// # impl SpawnLocalExt for MyCustomRuntime {
1144/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
1145/// #     where
1146/// #         F: Future<Output = ()> + 'static
1147/// #     {
1148/// #         unreachable!()
1149/// #     }
1150/// # }
1151/// #
1152/// # impl LocalContextExt for MyCustomRuntime {
1153/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1154/// #     where
1155/// #         F: Future<Output = R> + 'static
1156/// #     {
1157/// #         unreachable!()
1158/// #     }
1159/// # }
1160/// #
1161/// use std::{rc::Rc, time::Duration};
1162///
1163/// use pyo3::prelude::*;
1164///
1165/// /// Awaitable sleep function
1166/// #[pyfunction]
1167/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
1168///     // Rc is !Send so it cannot be passed into pyo3_async_runtimes::generic::future_into_py
1169///     let secs = Rc::new(secs);
1170///
1171///     pyo3_async_runtimes::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1172///         MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1173///         Ok(())
1174///     })
1175/// }
1176/// ```
1177#[deprecated(
1178    since = "0.18.0",
1179    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1180)]
1181#[allow(deprecated)]
1182pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
1183where
1184    R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1185    F: Future<Output = PyResult<T>> + 'static,
1186    T: for<'py> IntoPyObject<'py>,
1187{
1188    local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1189}
1190
1191/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1192///
1193/// **This API is marked as unstable** and is only available when the
1194/// `unstable-streams` crate feature is enabled. This comes with no
1195/// stability guarantees, and could be changed or removed at any time.
1196///
1197/// # Arguments
1198/// * `locals` - The current task locals
1199/// * `gen` - The Python async generator to be converted
1200///
1201/// # Examples
1202/// ```no_run
1203/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1204/// #
1205/// # use pyo3_async_runtimes::{
1206/// #     TaskLocals,
1207/// #     generic::{JoinError, ContextExt, Runtime}
1208/// # };
1209/// #
1210/// # struct MyCustomJoinError;
1211/// #
1212/// # impl JoinError for MyCustomJoinError {
1213/// #     fn is_panic(&self) -> bool {
1214/// #         unreachable!()
1215/// #     }
1216/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1217/// #         unreachable!()
1218/// #     }
1219/// # }
1220/// #
1221/// # struct MyCustomJoinHandle;
1222/// #
1223/// # impl Future for MyCustomJoinHandle {
1224/// #     type Output = Result<(), MyCustomJoinError>;
1225/// #
1226/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1227/// #         unreachable!()
1228/// #     }
1229/// # }
1230/// #
1231/// # struct MyCustomRuntime;
1232/// #
1233/// # impl Runtime for MyCustomRuntime {
1234/// #     type JoinError = MyCustomJoinError;
1235/// #     type JoinHandle = MyCustomJoinHandle;
1236/// #
1237/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1238/// #     where
1239/// #         F: Future<Output = ()> + Send + 'static
1240/// #     {
1241/// #         unreachable!()
1242/// #     }
1243/// # }
1244/// #
1245/// # impl ContextExt for MyCustomRuntime {
1246/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1247/// #     where
1248/// #         F: Future<Output = R> + Send + 'static
1249/// #     {
1250/// #         unreachable!()
1251/// #     }
1252/// #     fn get_task_locals() -> Option<TaskLocals> {
1253/// #         unreachable!()
1254/// #     }
1255/// # }
1256///
1257/// use pyo3::prelude::*;
1258/// use futures::{StreamExt, TryStreamExt};
1259/// use std::ffi::CString;
1260///
1261/// const TEST_MOD: &str = r#"
1262/// import asyncio
1263///
1264/// async def gen():
1265///     for i in range(10):
1266///         await asyncio.sleep(0.1)
1267///         yield i
1268/// "#;
1269///
1270/// # async fn test_async_gen() -> PyResult<()> {
1271/// let stream = Python::attach(|py| {
1272///     let test_mod = PyModule::from_code(
1273///         py,
1274///         &CString::new(TEST_MOD).unwrap(),
1275///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1276///         &CString::new("test_mod").unwrap(),
1277///     )?;
1278///
1279///     pyo3_async_runtimes::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1280///         pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
1281///         test_mod.call_method0("gen")?
1282///     )
1283/// })?;
1284///
1285/// let vals = stream
1286///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1287///     .try_collect::<Vec<i32>>()
1288///     .await?;
1289///
1290/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1291///
1292/// Ok(())
1293/// # }
1294/// ```
1295#[cfg(feature = "unstable-streams")]
1296#[allow(unused_must_use)] // False positive unused lint on `R::spawn`
1297pub fn into_stream_with_locals_v1<R>(
1298    locals: TaskLocals,
1299    gen: Bound<'_, PyAny>,
1300) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
1301where
1302    R: Runtime,
1303{
1304    let (tx, rx) = async_channel::bounded(1);
1305    let anext: Py<PyAny> = gen.getattr("__anext__")?.into();
1306
1307    R::spawn(async move {
1308        loop {
1309            let fut = Python::attach(|py| -> PyResult<_> {
1310                into_future_with_locals(&locals, anext.bind(py).call0()?)
1311            });
1312            let item = match fut {
1313                Ok(fut) => match fut.await {
1314                    Ok(item) => Ok(item),
1315                    Err(e) => {
1316                        let stop_iter = Python::attach(|py| {
1317                            e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1318                        });
1319
1320                        if stop_iter {
1321                            // end the iteration
1322                            break;
1323                        } else {
1324                            Err(e)
1325                        }
1326                    }
1327                },
1328                Err(e) => Err(e),
1329            };
1330
1331            if tx.send(item).await.is_err() {
1332                // receiving side was dropped
1333                break;
1334            }
1335        }
1336    });
1337
1338    Ok(rx)
1339}
1340
1341/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1342///
1343/// **This API is marked as unstable** and is only available when the
1344/// `unstable-streams` crate feature is enabled. This comes with no
1345/// stability guarantees, and could be changed or removed at any time.
1346///
1347/// # Arguments
1348/// * `gen` - The Python async generator to be converted
1349///
1350/// # Examples
1351/// ```no_run
1352/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1353/// #
1354/// # use pyo3_async_runtimes::{
1355/// #     TaskLocals,
1356/// #     generic::{JoinError, ContextExt, Runtime}
1357/// # };
1358/// #
1359/// # struct MyCustomJoinError;
1360/// #
1361/// # impl JoinError for MyCustomJoinError {
1362/// #     fn is_panic(&self) -> bool {
1363/// #         unreachable!()
1364/// #     }
1365/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1366/// #         unreachable!()
1367/// #     }
1368/// # }
1369/// #
1370/// # struct MyCustomJoinHandle;
1371/// #
1372/// # impl Future for MyCustomJoinHandle {
1373/// #     type Output = Result<(), MyCustomJoinError>;
1374/// #
1375/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1376/// #         unreachable!()
1377/// #     }
1378/// # }
1379/// #
1380/// # struct MyCustomRuntime;
1381/// #
1382/// # impl Runtime for MyCustomRuntime {
1383/// #     type JoinError = MyCustomJoinError;
1384/// #     type JoinHandle = MyCustomJoinHandle;
1385/// #
1386/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1387/// #     where
1388/// #         F: Future<Output = ()> + Send + 'static
1389/// #     {
1390/// #         unreachable!()
1391/// #     }
1392/// # }
1393/// #
1394/// # impl ContextExt for MyCustomRuntime {
1395/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1396/// #     where
1397/// #         F: Future<Output = R> + Send + 'static
1398/// #     {
1399/// #         unreachable!()
1400/// #     }
1401/// #     fn get_task_locals() -> Option<TaskLocals> {
1402/// #         unreachable!()
1403/// #     }
1404/// # }
1405///
1406/// use pyo3::prelude::*;
1407/// use futures::{StreamExt, TryStreamExt};
1408/// use std::ffi::CString;
1409///
1410/// const TEST_MOD: &str = r#"
1411/// import asyncio
1412///
1413/// async def gen():
1414///     for i in range(10):
1415///         await asyncio.sleep(0.1)
1416///         yield i
1417/// "#;
1418///
1419/// # async fn test_async_gen() -> PyResult<()> {
1420/// let stream = Python::attach(|py| {
1421///     let test_mod = PyModule::from_code(
1422///         py,
1423///         &CString::new(TEST_MOD).unwrap(),
1424///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1425///         &CString::new("test_mod").unwrap(),
1426///     )?;
1427///
1428///     pyo3_async_runtimes::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1429/// })?;
1430///
1431/// let vals = stream
1432///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1433///     .try_collect::<Vec<i32>>()
1434///     .await?;
1435///
1436/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1437///
1438/// Ok(())
1439/// # }
1440/// ```
1441#[cfg(feature = "unstable-streams")]
1442pub fn into_stream_v1<R>(
1443    gen: Bound<'_, PyAny>,
1444) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
1445where
1446    R: Runtime + ContextExt,
1447{
1448    into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1449}
1450
1451trait Sender: Send + 'static {
1452    fn send(&mut self, py: Python, locals: TaskLocals, item: Py<PyAny>) -> PyResult<Py<PyAny>>;
1453    fn close(&mut self) -> PyResult<()>;
1454}
1455
1456#[cfg(feature = "unstable-streams")]
1457struct GenericSender<R>
1458where
1459    R: Runtime,
1460{
1461    runtime: PhantomData<R>,
1462    tx: mpsc::Sender<Py<PyAny>>,
1463}
1464
1465#[cfg(feature = "unstable-streams")]
1466impl<R> Sender for GenericSender<R>
1467where
1468    R: Runtime + ContextExt,
1469{
1470    fn send(&mut self, py: Python, locals: TaskLocals, item: Py<PyAny>) -> PyResult<Py<PyAny>> {
1471        match self.tx.try_send(item.clone_ref(py)) {
1472            Ok(_) => true.into_py_any(py),
1473            Err(e) => {
1474                if e.is_full() {
1475                    let mut tx = self.tx.clone();
1476
1477                    future_into_py_with_locals::<R, _, bool>(py, locals, async move {
1478                        if tx.flush().await.is_err() {
1479                            // receiving side disconnected
1480                            return Ok(false);
1481                        }
1482                        if tx.send(item).await.is_err() {
1483                            // receiving side disconnected
1484                            return Ok(false);
1485                        }
1486                        Ok(true)
1487                    })
1488                    .map(Bound::unbind)
1489                } else {
1490                    false.into_py_any(py)
1491                }
1492            }
1493        }
1494    }
1495    fn close(&mut self) -> PyResult<()> {
1496        self.tx.close_channel();
1497        Ok(())
1498    }
1499}
1500
1501#[pyclass]
1502struct SenderGlue {
1503    locals: TaskLocals,
1504    tx: Arc<Mutex<dyn Sender>>,
1505}
1506#[pymethods]
1507impl SenderGlue {
1508    pub fn send(&mut self, item: Py<PyAny>) -> PyResult<Py<PyAny>> {
1509        Python::attach(|py| {
1510            self.tx
1511                .lock()
1512                .unwrap()
1513                .send(py, self.locals.clone_ref(py), item)
1514        })
1515    }
1516    pub fn close(&mut self) -> PyResult<()> {
1517        self.tx.lock().unwrap().close()
1518    }
1519}
1520
1521#[cfg(feature = "unstable-streams")]
1522const STREAM_GLUE: &str = r#"
1523import asyncio
1524
1525async def forward(gen, sender):
1526    async for item in gen:
1527        should_continue = sender.send(item)
1528
1529        if asyncio.iscoroutine(should_continue):
1530            should_continue = await should_continue
1531
1532        if should_continue:
1533            continue
1534        else:
1535            break
1536
1537    sender.close()
1538"#;
1539
1540/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1541///
1542/// **This API is marked as unstable** and is only available when the
1543/// `unstable-streams` crate feature is enabled. This comes with no
1544/// stability guarantees, and could be changed or removed at any time.
1545///
1546/// # Arguments
1547/// * `locals` - The current task locals
1548/// * `gen` - The Python async generator to be converted
1549///
1550/// # Examples
1551/// ```no_run
1552/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1553/// #
1554/// # use pyo3_async_runtimes::{
1555/// #     TaskLocals,
1556/// #     generic::{JoinError, ContextExt, Runtime}
1557/// # };
1558/// #
1559/// # struct MyCustomJoinError;
1560/// #
1561/// # impl JoinError for MyCustomJoinError {
1562/// #     fn is_panic(&self) -> bool {
1563/// #         unreachable!()
1564/// #     }
1565/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1566/// #         unreachable!()
1567/// #     }
1568/// # }
1569/// #
1570/// # struct MyCustomJoinHandle;
1571/// #
1572/// # impl Future for MyCustomJoinHandle {
1573/// #     type Output = Result<(), MyCustomJoinError>;
1574/// #
1575/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1576/// #         unreachable!()
1577/// #     }
1578/// # }
1579/// #
1580/// # struct MyCustomRuntime;
1581/// #
1582/// # impl Runtime for MyCustomRuntime {
1583/// #     type JoinError = MyCustomJoinError;
1584/// #     type JoinHandle = MyCustomJoinHandle;
1585/// #
1586/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1587/// #     where
1588/// #         F: Future<Output = ()> + Send + 'static
1589/// #     {
1590/// #         unreachable!()
1591/// #     }
1592/// # }
1593/// #
1594/// # impl ContextExt for MyCustomRuntime {
1595/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1596/// #     where
1597/// #         F: Future<Output = R> + Send + 'static
1598/// #     {
1599/// #         unreachable!()
1600/// #     }
1601/// #     fn get_task_locals() -> Option<TaskLocals> {
1602/// #         unreachable!()
1603/// #     }
1604/// # }
1605///
1606/// use pyo3::prelude::*;
1607/// use futures::{StreamExt, TryStreamExt};
1608/// use std::ffi::CString;
1609///
1610/// const TEST_MOD: &str = r#"
1611/// import asyncio
1612///
1613/// async def gen():
1614///     for i in range(10):
1615///         await asyncio.sleep(0.1)
1616///         yield i
1617/// "#;
1618///
1619/// # async fn test_async_gen() -> PyResult<()> {
1620/// let stream = Python::attach(|py| {
1621///     let test_mod = PyModule::from_code(
1622///         py,
1623///         &CString::new(TEST_MOD).unwrap(),
1624///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1625///         &CString::new("test_mod").unwrap(),
1626///     )?;
1627///
1628///     pyo3_async_runtimes::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1629///         pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
1630///         test_mod.call_method0("gen")?
1631///     )
1632/// })?;
1633///
1634/// let vals = stream
1635///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1636///     .try_collect::<Vec<i32>>()
1637///     .await?;
1638///
1639/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1640///
1641/// Ok(())
1642/// # }
1643/// ```
1644#[cfg(feature = "unstable-streams")]
1645pub fn into_stream_with_locals_v2<R>(
1646    locals: TaskLocals,
1647    gen: Bound<'_, PyAny>,
1648) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
1649where
1650    R: Runtime + ContextExt,
1651{
1652    use std::ffi::CString;
1653
1654    use pyo3::sync::PyOnceLock;
1655
1656    static GLUE_MOD: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
1657    let py = gen.py();
1658    let glue = GLUE_MOD
1659        .get_or_try_init(py, || -> PyResult<Py<PyAny>> {
1660            Ok(PyModule::from_code(
1661                py,
1662                &CString::new(STREAM_GLUE).unwrap(),
1663                &CString::new("pyo3_async_runtimes/pyo3_async_runtimes_glue.py").unwrap(),
1664                &CString::new("pyo3_async_runtimes_glue").unwrap(),
1665            )?
1666            .into())
1667        })?
1668        .bind(py);
1669
1670    let (tx, rx) = mpsc::channel(10);
1671
1672    locals.event_loop(py).call_method1(
1673        "call_soon_threadsafe",
1674        (
1675            locals.event_loop(py).getattr("create_task")?,
1676            glue.call_method1(
1677                "forward",
1678                (
1679                    gen,
1680                    SenderGlue {
1681                        locals,
1682                        tx: Arc::new(Mutex::new(GenericSender {
1683                            runtime: PhantomData::<R>,
1684                            tx,
1685                        })),
1686                    },
1687                ),
1688            )?,
1689        ),
1690    )?;
1691    Ok(rx)
1692}
1693
1694/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1695///
1696/// **This API is marked as unstable** and is only available when the
1697/// `unstable-streams` crate feature is enabled. This comes with no
1698/// stability guarantees, and could be changed or removed at any time.
1699///
1700/// # Arguments
1701/// * `gen` - The Python async generator to be converted
1702///
1703/// # Examples
1704/// ```no_run
1705/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1706/// #
1707/// # use pyo3_async_runtimes::{
1708/// #     TaskLocals,
1709/// #     generic::{JoinError, ContextExt, Runtime}
1710/// # };
1711/// #
1712/// # struct MyCustomJoinError;
1713/// #
1714/// # impl JoinError for MyCustomJoinError {
1715/// #     fn is_panic(&self) -> bool {
1716/// #         unreachable!()
1717/// #     }
1718/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1719/// #         unreachable!()
1720/// #     }
1721/// # }
1722/// #
1723/// # struct MyCustomJoinHandle;
1724/// #
1725/// # impl Future for MyCustomJoinHandle {
1726/// #     type Output = Result<(), MyCustomJoinError>;
1727/// #
1728/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1729/// #         unreachable!()
1730/// #     }
1731/// # }
1732/// #
1733/// # struct MyCustomRuntime;
1734/// #
1735/// # impl Runtime for MyCustomRuntime {
1736/// #     type JoinError = MyCustomJoinError;
1737/// #     type JoinHandle = MyCustomJoinHandle;
1738/// #
1739/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1740/// #     where
1741/// #         F: Future<Output = ()> + Send + 'static
1742/// #     {
1743/// #         unreachable!()
1744/// #     }
1745/// # }
1746/// #
1747/// # impl ContextExt for MyCustomRuntime {
1748/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1749/// #     where
1750/// #         F: Future<Output = R> + Send + 'static
1751/// #     {
1752/// #         unreachable!()
1753/// #     }
1754/// #     fn get_task_locals() -> Option<TaskLocals> {
1755/// #         unreachable!()
1756/// #     }
1757/// # }
1758///
1759/// use pyo3::prelude::*;
1760/// use futures::{StreamExt, TryStreamExt};
1761/// use std::ffi::CString;
1762///
1763/// const TEST_MOD: &str = r#"
1764/// import asyncio
1765///
1766/// async def gen():
1767///     for i in range(10):
1768///         await asyncio.sleep(0.1)
1769///         yield i
1770/// "#;
1771///
1772/// # async fn test_async_gen() -> PyResult<()> {
1773/// let stream = Python::attach(|py| {
1774///     let test_mod = PyModule::from_code(
1775///         py,
1776///         &CString::new(TEST_MOD).unwrap(),
1777///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1778///         &CString::new("test_mod").unwrap(),
1779///     )?;
1780///
1781///     pyo3_async_runtimes::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1782/// })?;
1783///
1784/// let vals = stream
1785///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1786///     .try_collect::<Vec<i32>>()
1787///     .await?;
1788///
1789/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1790///
1791/// Ok(())
1792/// # }
1793/// ```
1794#[cfg(feature = "unstable-streams")]
1795pub fn into_stream_v2<R>(
1796    gen: Bound<'_, PyAny>,
1797) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
1798where
1799    R: Runtime + ContextExt,
1800{
1801    into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1802}