pyo3_asyncio/
generic.rs

1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio]
12//! version = "0.20"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17    future::Future,
18    marker::PhantomData,
19    pin::Pin,
20    sync::{Arc, Mutex},
21    task::{Context, Poll},
22};
23
24use futures::{
25    channel::{mpsc, oneshot},
26    SinkExt,
27};
28use once_cell::sync::OnceCell;
29use pin_project_lite::pin_project;
30use pyo3::prelude::*;
31
32use crate::{
33    asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
34    get_running_loop, into_future_with_locals, TaskLocals,
35};
36
37/// Generic utilities for a JoinError
38pub trait JoinError {
39    /// Check if the spawned task exited because of a panic
40    fn is_panic(&self) -> bool;
41    /// Get the panic object associated with the error.  Panics if `is_panic` is not true.
42    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
43}
44
45/// Generic Rust async/await runtime
46pub trait Runtime: Send + 'static {
47    /// The error returned by a JoinHandle after being awaited
48    type JoinError: JoinError + Send;
49    /// A future that completes with the result of the spawned task
50    type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
51
52    /// Spawn a future onto this runtime's event loop
53    fn spawn<F>(fut: F) -> Self::JoinHandle
54    where
55        F: Future<Output = ()> + Send + 'static;
56}
57
58/// Extension trait for async/await runtimes that support spawning local tasks
59pub trait SpawnLocalExt: Runtime {
60    /// Spawn a !Send future onto this runtime's event loop
61    fn spawn_local<F>(fut: F) -> Self::JoinHandle
62    where
63        F: Future<Output = ()> + 'static;
64}
65
66/// Exposes the utilities necessary for using task-local data in the Runtime
67pub trait ContextExt: Runtime {
68    /// Set the task locals for the given future
69    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
70    where
71        F: Future<Output = R> + Send + 'static;
72
73    /// Get the task locals for the current task
74    fn get_task_locals() -> Option<TaskLocals>;
75}
76
77/// Adds the ability to scope task-local data for !Send futures
78pub trait LocalContextExt: Runtime {
79    /// Set the task locals for the given !Send future
80    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
81    where
82        F: Future<Output = R> + 'static;
83}
84
85/// Get the current event loop from either Python or Rust async task local context
86///
87/// This function first checks if the runtime has a task-local reference to the Python event loop.
88/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
89/// with the current OS thread.
90pub fn get_current_loop<R>(py: Python) -> PyResult<&PyAny>
91where
92    R: ContextExt,
93{
94    if let Some(locals) = R::get_task_locals() {
95        Ok(locals.event_loop.into_ref(py))
96    } else {
97        get_running_loop(py)
98    }
99}
100
101/// Either copy the task locals from the current task OR get the current running loop and
102/// contextvars from Python.
103pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
104where
105    R: ContextExt,
106{
107    if let Some(locals) = R::get_task_locals() {
108        Ok(locals)
109    } else {
110        Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
111    }
112}
113
114/// Run the event loop until the given Future completes
115///
116/// After this function returns, the event loop can be resumed with [`run_until_complete`]
117///
118/// # Arguments
119/// * `event_loop` - The Python event loop that should run the future
120/// * `fut` - The future to drive to completion
121///
122/// # Examples
123///
124/// ```no_run
125/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
126/// #
127/// # use pyo3_asyncio::{
128/// #     TaskLocals,
129/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
130/// # };
131/// #
132/// # struct MyCustomJoinError;
133/// #
134/// # impl JoinError for MyCustomJoinError {
135/// #     fn is_panic(&self) -> bool {
136/// #         unreachable!()
137/// #     }
138/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
139/// #         unreachable!()
140/// #     }
141/// # }
142/// #
143/// # struct MyCustomJoinHandle;
144/// #
145/// # impl Future for MyCustomJoinHandle {
146/// #     type Output = Result<(), MyCustomJoinError>;
147/// #
148/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
149/// #         unreachable!()
150/// #     }
151/// # }
152/// #
153/// # struct MyCustomRuntime;
154/// #
155/// # impl Runtime for MyCustomRuntime {
156/// #     type JoinError = MyCustomJoinError;
157/// #     type JoinHandle = MyCustomJoinHandle;
158/// #
159/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
160/// #     where
161/// #         F: Future<Output = ()> + Send + 'static
162/// #     {
163/// #         unreachable!()
164/// #     }
165/// # }
166/// #
167/// # impl ContextExt for MyCustomRuntime {
168/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
169/// #     where
170/// #         F: Future<Output = R> + Send + 'static
171/// #     {
172/// #         unreachable!()
173/// #     }
174/// #     fn get_task_locals() -> Option<TaskLocals> {
175/// #         unreachable!()
176/// #     }
177/// # }
178/// #
179/// # use std::time::Duration;
180/// #
181/// # use pyo3::prelude::*;
182/// #
183/// # Python::with_gil(|py| -> PyResult<()> {
184/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
185/// # #[cfg(feature = "tokio-runtime")]
186/// pyo3_asyncio::generic::run_until_complete::<MyCustomRuntime, _, _>(event_loop, async move {
187///     tokio::time::sleep(Duration::from_secs(1)).await;
188///     Ok(())
189/// })?;
190/// # Ok(())
191/// # }).unwrap();
192/// ```
193pub fn run_until_complete<R, F, T>(event_loop: &PyAny, fut: F) -> PyResult<T>
194where
195    R: Runtime + ContextExt,
196    F: Future<Output = PyResult<T>> + Send + 'static,
197    T: Send + Sync + 'static,
198{
199    let py = event_loop.py();
200    let result_tx = Arc::new(Mutex::new(None));
201    let result_rx = Arc::clone(&result_tx);
202    let coro = future_into_py_with_locals::<R, _, ()>(
203        py,
204        TaskLocals::new(event_loop).copy_context(py)?,
205        async move {
206            let val = fut.await?;
207            if let Ok(mut result) = result_tx.lock() {
208                *result = Some(val);
209            }
210            Ok(())
211        },
212    )?;
213
214    event_loop.call_method1("run_until_complete", (coro,))?;
215
216    let result = result_rx.lock().unwrap().take().unwrap();
217    Ok(result)
218}
219
220/// Run the event loop until the given Future completes
221///
222/// # Arguments
223/// * `py` - The current PyO3 GIL guard
224/// * `fut` - The future to drive to completion
225///
226/// # Examples
227///
228/// ```no_run
229/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
230/// #
231/// # use pyo3_asyncio::{
232/// #     TaskLocals,
233/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
234/// # };
235/// #
236/// # struct MyCustomJoinError;
237/// #
238/// # impl JoinError for MyCustomJoinError {
239/// #     fn is_panic(&self) -> bool {
240/// #         unreachable!()
241/// #     }
242/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
243/// #         unreachable!()
244/// #     }
245/// # }
246/// #
247/// # struct MyCustomJoinHandle;
248/// #
249/// # impl Future for MyCustomJoinHandle {
250/// #     type Output = Result<(), MyCustomJoinError>;
251/// #
252/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
253/// #         unreachable!()
254/// #     }
255/// # }
256/// #
257/// # struct MyCustomRuntime;
258/// #
259/// # impl Runtime for MyCustomRuntime {
260/// #     type JoinError = MyCustomJoinError;
261/// #     type JoinHandle = MyCustomJoinHandle;
262/// #
263/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
264/// #     where
265/// #         F: Future<Output = ()> + Send + 'static
266/// #     {
267/// #         unreachable!()
268/// #     }
269/// # }
270/// #
271/// # impl ContextExt for MyCustomRuntime {
272/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
273/// #     where
274/// #         F: Future<Output = R> + Send + 'static
275/// #     {
276/// #         unreachable!()
277/// #     }
278/// #     fn get_task_locals() -> Option<TaskLocals> {
279/// #         unreachable!()
280/// #     }
281/// # }
282/// #
283/// # use std::time::Duration;
284/// # async fn custom_sleep(_duration: Duration) { }
285/// #
286/// # use pyo3::prelude::*;
287/// #
288/// fn main() {
289///     Python::with_gil(|py| {
290///         pyo3_asyncio::generic::run::<MyCustomRuntime, _, _>(py, async move {
291///             custom_sleep(Duration::from_secs(1)).await;
292///             Ok(())
293///         })
294///         .map_err(|e| {
295///             e.print_and_set_sys_last_vars(py);
296///         })
297///         .unwrap();
298///     })
299/// }
300/// ```
301pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
302where
303    R: Runtime + ContextExt,
304    F: Future<Output = PyResult<T>> + Send + 'static,
305    T: Send + Sync + 'static,
306{
307    let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
308
309    let result = run_until_complete::<R, F, T>(event_loop, fut);
310
311    close(event_loop)?;
312
313    result
314}
315
316fn cancelled(future: &PyAny) -> PyResult<bool> {
317    future.getattr("cancelled")?.call0()?.is_true()
318}
319
320#[pyclass]
321struct CheckedCompletor;
322
323#[pymethods]
324impl CheckedCompletor {
325    fn __call__(&self, future: &PyAny, complete: &PyAny, value: &PyAny) -> PyResult<()> {
326        if cancelled(future)? {
327            return Ok(());
328        }
329
330        complete.call1((value,))?;
331
332        Ok(())
333    }
334}
335
336fn set_result(event_loop: &PyAny, future: &PyAny, result: PyResult<PyObject>) -> PyResult<()> {
337    let py = event_loop.py();
338    let none = py.None().into_ref(py);
339
340    let (complete, val) = match result {
341        Ok(val) => (future.getattr("set_result")?, val.into_py(py)),
342        Err(err) => (future.getattr("set_exception")?, err.into_py(py)),
343    };
344    call_soon_threadsafe(event_loop, none, (CheckedCompletor, future, complete, val))?;
345
346    Ok(())
347}
348
349/// Convert a Python `awaitable` into a Rust Future
350///
351/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
352/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
353/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
354///
355/// # Arguments
356/// * `awaitable` - The Python `awaitable` to be converted
357///
358/// # Examples
359///
360/// ```no_run
361/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
362/// #
363/// # use pyo3::prelude::*;
364/// #
365/// # use pyo3_asyncio::{
366/// #     TaskLocals,
367/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
368/// # };
369/// #
370/// # struct MyCustomJoinError;
371/// #
372/// # impl JoinError for MyCustomJoinError {
373/// #     fn is_panic(&self) -> bool {
374/// #         unreachable!()
375/// #     }
376/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
377/// #         unreachable!()
378/// #     }
379/// # }
380/// #
381/// # struct MyCustomJoinHandle;
382/// #
383/// # impl Future for MyCustomJoinHandle {
384/// #     type Output = Result<(), MyCustomJoinError>;
385/// #
386/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
387/// #         unreachable!()
388/// #     }
389/// # }
390/// #
391/// # struct MyCustomRuntime;
392/// #
393/// # impl MyCustomRuntime {
394/// #     async fn sleep(_: Duration) {
395/// #         unreachable!()
396/// #     }
397/// # }
398/// #
399/// # impl Runtime for MyCustomRuntime {
400/// #     type JoinError = MyCustomJoinError;
401/// #     type JoinHandle = MyCustomJoinHandle;
402/// #
403/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
404/// #     where
405/// #         F: Future<Output = ()> + Send + 'static
406/// #     {
407/// #         unreachable!()
408/// #     }
409/// # }
410/// #
411/// # impl ContextExt for MyCustomRuntime {
412/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
413/// #     where
414/// #         F: Future<Output = R> + Send + 'static
415/// #     {
416/// #         unreachable!()
417/// #     }
418/// #     fn get_task_locals() -> Option<TaskLocals> {
419/// #         unreachable!()
420/// #     }
421/// # }
422/// #
423/// const PYTHON_CODE: &'static str = r#"
424/// import asyncio
425///
426/// async def py_sleep(duration):
427///     await asyncio.sleep(duration)
428/// "#;
429///
430/// async fn py_sleep(seconds: f32) -> PyResult<()> {
431///     let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
432///         Ok(
433///             PyModule::from_code(
434///                 py,
435///                 PYTHON_CODE,
436///                 "test_into_future/test_mod.py",
437///                 "test_mod"
438///             )?
439///             .into()
440///         )
441///     })?;
442///
443///     Python::with_gil(|py| {
444///         pyo3_asyncio::generic::into_future::<MyCustomRuntime>(
445///             test_mod
446///                 .call_method1(py, "py_sleep", (seconds.into_py(py),))?
447///                 .as_ref(py),
448///         )
449///     })?
450///     .await?;
451///     Ok(())
452/// }
453/// ```
454pub fn into_future<R>(
455    awaitable: &PyAny,
456) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
457where
458    R: Runtime + ContextExt,
459{
460    into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
461}
462
463/// Convert a Rust Future into a Python awaitable with a generic runtime
464///
465/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
466/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
467///
468/// Python `contextvars` are preserved when calling async Python functions within the Rust future
469/// via [`into_future`] (new behaviour in `v0.15`).
470///
471/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
472/// unfortunately fail to resolve them when called within the Rust future. This is because the
473/// function is being called from a Rust thread, not inside an actual Python coroutine context.
474/// >
475/// > As a workaround, you can get the `contextvars` from the current task locals using
476/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
477/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
478/// synchronous function, and restore the previous context when it returns or raises an exception.
479///
480/// # Arguments
481/// * `py` - PyO3 GIL guard
482/// * `locals` - The task-local data for Python
483/// * `fut` - The Rust future to be converted
484///
485/// # Examples
486///
487/// ```no_run
488/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
489/// #
490/// # use pyo3_asyncio::{
491/// #     TaskLocals,
492/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
493/// # };
494/// #
495/// # struct MyCustomJoinError;
496/// #
497/// # impl JoinError for MyCustomJoinError {
498/// #     fn is_panic(&self) -> bool {
499/// #         unreachable!()
500/// #     }
501/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
502/// #         unreachable!()
503/// #     }
504/// # }
505/// #
506/// # struct MyCustomJoinHandle;
507/// #
508/// # impl Future for MyCustomJoinHandle {
509/// #     type Output = Result<(), MyCustomJoinError>;
510/// #
511/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
512/// #         unreachable!()
513/// #     }
514/// # }
515/// #
516/// # struct MyCustomRuntime;
517/// #
518/// # impl MyCustomRuntime {
519/// #     async fn sleep(_: Duration) {
520/// #         unreachable!()
521/// #     }
522/// # }
523/// #
524/// # impl Runtime for MyCustomRuntime {
525/// #     type JoinError = MyCustomJoinError;
526/// #     type JoinHandle = MyCustomJoinHandle;
527/// #
528/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
529/// #     where
530/// #         F: Future<Output = ()> + Send + 'static
531/// #     {
532/// #         unreachable!()
533/// #     }
534/// # }
535/// #
536/// # impl ContextExt for MyCustomRuntime {
537/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
538/// #     where
539/// #         F: Future<Output = R> + Send + 'static
540/// #     {
541/// #         unreachable!()
542/// #     }
543/// #     fn get_task_locals() -> Option<TaskLocals> {
544/// #         unreachable!()
545/// #     }
546/// # }
547/// #
548/// use std::time::Duration;
549///
550/// use pyo3::prelude::*;
551///
552/// /// Awaitable sleep function
553/// #[pyfunction]
554/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
555///     let secs = secs.extract()?;
556///     pyo3_asyncio::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
557///         py,
558///         pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
559///         async move {
560///             MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
561///             Ok(())
562///         }
563///     )
564/// }
565/// ```
566pub fn future_into_py_with_locals<R, F, T>(
567    py: Python,
568    locals: TaskLocals,
569    fut: F,
570) -> PyResult<&PyAny>
571where
572    R: Runtime + ContextExt,
573    F: Future<Output = PyResult<T>> + Send + 'static,
574    T: IntoPy<PyObject>,
575{
576    let (cancel_tx, cancel_rx) = oneshot::channel();
577
578    let py_fut = create_future(locals.event_loop.clone().into_ref(py))?;
579    py_fut.call_method1(
580        "add_done_callback",
581        (PyDoneCallback {
582            cancel_tx: Some(cancel_tx),
583        },),
584    )?;
585
586    let future_tx1 = PyObject::from(py_fut);
587    let future_tx2 = future_tx1.clone();
588
589    R::spawn(async move {
590        let locals2 = locals.clone();
591
592        if let Err(e) = R::spawn(async move {
593            let result = R::scope(
594                locals2.clone(),
595                Cancellable::new_with_cancel_rx(fut, cancel_rx),
596            )
597            .await;
598
599            Python::with_gil(move |py| {
600                if cancelled(future_tx1.as_ref(py))
601                    .map_err(dump_err(py))
602                    .unwrap_or(false)
603                {
604                    return;
605                }
606
607                let _ = set_result(
608                    locals2.event_loop(py),
609                    future_tx1.as_ref(py),
610                    result.map(|val| val.into_py(py)),
611                )
612                .map_err(dump_err(py));
613            });
614        })
615        .await
616        {
617            if e.is_panic() {
618                Python::with_gil(move |py| {
619                    if cancelled(future_tx2.as_ref(py))
620                        .map_err(dump_err(py))
621                        .unwrap_or(false)
622                    {
623                        return;
624                    }
625
626                    let panic_message = format!(
627                        "rust future panicked: {}",
628                        get_panic_message(&e.into_panic())
629                    );
630                    let _ = set_result(
631                        locals.event_loop.as_ref(py),
632                        future_tx2.as_ref(py),
633                        Err(RustPanic::new_err(panic_message)),
634                    )
635                    .map_err(dump_err(py));
636                });
637            }
638        }
639    });
640
641    Ok(py_fut)
642}
643
644fn get_panic_message(any: &dyn std::any::Any) -> &str {
645    if let Some(str_slice) = any.downcast_ref::<&str>() {
646        str_slice
647    } else if let Some(string) = any.downcast_ref::<String>() {
648        string
649    } else {
650        "unknown error"
651    }
652}
653
654pin_project! {
655    /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
656    #[must_use = "futures do nothing unless you `.await` or poll them"]
657    #[derive(Debug)]
658    struct Cancellable<T> {
659        #[pin]
660        future: T,
661        #[pin]
662        cancel_rx: oneshot::Receiver<()>,
663
664        poll_cancel_rx: bool
665    }
666}
667
668impl<T> Cancellable<T> {
669    fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
670        Self {
671            future,
672            cancel_rx,
673
674            poll_cancel_rx: true,
675        }
676    }
677}
678
679impl<F, T> Future for Cancellable<F>
680where
681    F: Future<Output = PyResult<T>>,
682    T: IntoPy<PyObject>,
683{
684    type Output = F::Output;
685
686    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
687        let this = self.project();
688
689        // First, try polling the future
690        if let Poll::Ready(v) = this.future.poll(cx) {
691            return Poll::Ready(v);
692        }
693
694        // Now check for cancellation
695        if *this.poll_cancel_rx {
696            match this.cancel_rx.poll(cx) {
697                Poll::Ready(Ok(())) => {
698                    *this.poll_cancel_rx = false;
699                    // The python future has already been cancelled, so this return value will never
700                    // be used.
701                    Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
702                        "unreachable",
703                    )))
704                }
705                Poll::Ready(Err(_)) => {
706                    *this.poll_cancel_rx = false;
707                    Poll::Pending
708                }
709                Poll::Pending => Poll::Pending,
710            }
711        } else {
712            Poll::Pending
713        }
714    }
715}
716
717#[pyclass]
718struct PyDoneCallback {
719    cancel_tx: Option<oneshot::Sender<()>>,
720}
721
722#[pymethods]
723impl PyDoneCallback {
724    pub fn __call__(&mut self, fut: &PyAny) -> PyResult<()> {
725        let py = fut.py();
726
727        if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
728            let _ = self.cancel_tx.take().unwrap().send(());
729        }
730
731        Ok(())
732    }
733}
734
735/// Convert a Rust Future into a Python awaitable with a generic runtime
736///
737/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
738/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
739///
740/// Python `contextvars` are preserved when calling async Python functions within the Rust future
741/// via [`into_future`] (new behaviour in `v0.15`).
742///
743/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
744/// unfortunately fail to resolve them when called within the Rust future. This is because the
745/// function is being called from a Rust thread, not inside an actual Python coroutine context.
746/// >
747/// > As a workaround, you can get the `contextvars` from the current task locals using
748/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
749/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
750/// synchronous function, and restore the previous context when it returns or raises an exception.
751///
752/// # Arguments
753/// * `py` - The current PyO3 GIL guard
754/// * `fut` - The Rust future to be converted
755///
756/// # Examples
757///
758/// ```no_run
759/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
760/// #
761/// # use pyo3_asyncio::{
762/// #     TaskLocals,
763/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
764/// # };
765/// #
766/// # struct MyCustomJoinError;
767/// #
768/// # impl JoinError for MyCustomJoinError {
769/// #     fn is_panic(&self) -> bool {
770/// #         unreachable!()
771/// #     }
772/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
773/// #         unreachable!()
774/// #     }
775/// # }
776/// #
777/// # struct MyCustomJoinHandle;
778/// #
779/// # impl Future for MyCustomJoinHandle {
780/// #     type Output = Result<(), MyCustomJoinError>;
781/// #
782/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
783/// #         unreachable!()
784/// #     }
785/// # }
786/// #
787/// # struct MyCustomRuntime;
788/// #
789/// # impl MyCustomRuntime {
790/// #     async fn sleep(_: Duration) {
791/// #         unreachable!()
792/// #     }
793/// # }
794/// #
795/// # impl Runtime for MyCustomRuntime {
796/// #     type JoinError = MyCustomJoinError;
797/// #     type JoinHandle = MyCustomJoinHandle;
798/// #
799/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
800/// #     where
801/// #         F: Future<Output = ()> + Send + 'static
802/// #     {
803/// #         unreachable!()
804/// #     }
805/// # }
806/// #
807/// # impl ContextExt for MyCustomRuntime {
808/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
809/// #     where
810/// #         F: Future<Output = R> + Send + 'static
811/// #     {
812/// #         unreachable!()
813/// #     }
814/// #     fn get_task_locals() -> Option<TaskLocals> {
815/// #         unreachable!()
816/// #     }
817/// # }
818/// #
819/// use std::time::Duration;
820///
821/// use pyo3::prelude::*;
822///
823/// /// Awaitable sleep function
824/// #[pyfunction]
825/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
826///     let secs = secs.extract()?;
827///     pyo3_asyncio::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
828///         MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
829///         Ok(())
830///     })
831/// }
832/// ```
833pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<&PyAny>
834where
835    R: Runtime + ContextExt,
836    F: Future<Output = PyResult<T>> + Send + 'static,
837    T: IntoPy<PyObject>,
838{
839    future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
840}
841
842/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
843/// specification of task locals.
844///
845/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
846/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
847///
848/// Python `contextvars` are preserved when calling async Python functions within the Rust future
849/// via [`into_future`] (new behaviour in `v0.15`).
850///
851/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
852/// unfortunately fail to resolve them when called within the Rust future. This is because the
853/// function is being called from a Rust thread, not inside an actual Python coroutine context.
854/// >
855/// > As a workaround, you can get the `contextvars` from the current task locals using
856/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
857/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
858/// synchronous function, and restore the previous context when it returns or raises an exception.
859///
860/// # Arguments
861/// * `py` - PyO3 GIL guard
862/// * `locals` - The task locals for the future
863/// * `fut` - The Rust future to be converted
864///
865/// # Examples
866///
867/// ```no_run
868/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
869/// #
870/// # use pyo3_asyncio::{
871/// #     TaskLocals,
872/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
873/// # };
874/// #
875/// # struct MyCustomJoinError;
876/// #
877/// # impl JoinError for MyCustomJoinError {
878/// #     fn is_panic(&self) -> bool {
879/// #         unreachable!()
880/// #     }
881/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
882/// #         unreachable!()
883/// #     }
884/// # }
885/// #
886/// # struct MyCustomJoinHandle;
887/// #
888/// # impl Future for MyCustomJoinHandle {
889/// #     type Output = Result<(), MyCustomJoinError>;
890/// #
891/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
892/// #         unreachable!()
893/// #     }
894/// # }
895/// #
896/// # struct MyCustomRuntime;
897/// #
898/// # impl MyCustomRuntime {
899/// #     async fn sleep(_: Duration) {
900/// #         unreachable!()
901/// #     }
902/// # }
903/// #
904/// # impl Runtime for MyCustomRuntime {
905/// #     type JoinError = MyCustomJoinError;
906/// #     type JoinHandle = MyCustomJoinHandle;
907/// #
908/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
909/// #     where
910/// #         F: Future<Output = ()> + Send + 'static
911/// #     {
912/// #         unreachable!()
913/// #     }
914/// # }
915/// #
916/// # impl ContextExt for MyCustomRuntime {
917/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
918/// #     where
919/// #         F: Future<Output = R> + Send + 'static
920/// #     {
921/// #         unreachable!()
922/// #     }
923/// #     fn get_task_locals() -> Option<TaskLocals> {
924/// #         unreachable!()
925/// #     }
926/// # }
927/// #
928/// # impl SpawnLocalExt for MyCustomRuntime {
929/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
930/// #     where
931/// #         F: Future<Output = ()> + 'static
932/// #     {
933/// #         unreachable!()
934/// #     }
935/// # }
936/// #
937/// # impl LocalContextExt for MyCustomRuntime {
938/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
939/// #     where
940/// #         F: Future<Output = R> + 'static
941/// #     {
942/// #         unreachable!()
943/// #     }
944/// # }
945/// #
946/// use std::{rc::Rc, time::Duration};
947///
948/// use pyo3::prelude::*;
949///
950/// /// Awaitable sleep function
951/// #[pyfunction]
952/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
953///     // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py
954///     let secs = Rc::new(secs);
955///
956///     pyo3_asyncio::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
957///         py,
958///         pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
959///         async move {
960///             MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
961///             Ok(())
962///         }
963///     )
964/// }
965/// ```
966#[deprecated(
967    since = "0.18.0",
968    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
969)]
970pub fn local_future_into_py_with_locals<R, F, T>(
971    py: Python,
972    locals: TaskLocals,
973    fut: F,
974) -> PyResult<&PyAny>
975where
976    R: Runtime + SpawnLocalExt + LocalContextExt,
977    F: Future<Output = PyResult<T>> + 'static,
978    T: IntoPy<PyObject>,
979{
980    let (cancel_tx, cancel_rx) = oneshot::channel();
981
982    let py_fut = create_future(locals.event_loop.clone().into_ref(py))?;
983    py_fut.call_method1(
984        "add_done_callback",
985        (PyDoneCallback {
986            cancel_tx: Some(cancel_tx),
987        },),
988    )?;
989
990    let future_tx1 = PyObject::from(py_fut);
991    let future_tx2 = future_tx1.clone();
992
993    R::spawn_local(async move {
994        let locals2 = locals.clone();
995
996        if let Err(e) = R::spawn_local(async move {
997            let result = R::scope_local(
998                locals2.clone(),
999                Cancellable::new_with_cancel_rx(fut, cancel_rx),
1000            )
1001            .await;
1002
1003            Python::with_gil(move |py| {
1004                if cancelled(future_tx1.as_ref(py))
1005                    .map_err(dump_err(py))
1006                    .unwrap_or(false)
1007                {
1008                    return;
1009                }
1010
1011                let _ = set_result(
1012                    locals2.event_loop.as_ref(py),
1013                    future_tx1.as_ref(py),
1014                    result.map(|val| val.into_py(py)),
1015                )
1016                .map_err(dump_err(py));
1017            });
1018        })
1019        .await
1020        {
1021            if e.is_panic() {
1022                Python::with_gil(move |py| {
1023                    if cancelled(future_tx2.as_ref(py))
1024                        .map_err(dump_err(py))
1025                        .unwrap_or(false)
1026                    {
1027                        return;
1028                    }
1029
1030                    let panic_message = format!(
1031                        "rust future panicked: {}",
1032                        get_panic_message(&e.into_panic())
1033                    );
1034                    let _ = set_result(
1035                        locals.event_loop.as_ref(py),
1036                        future_tx2.as_ref(py),
1037                        Err(RustPanic::new_err(panic_message)),
1038                    )
1039                    .map_err(dump_err(py));
1040                });
1041            }
1042        }
1043    });
1044
1045    Ok(py_fut)
1046}
1047
1048/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1049///
1050/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1051/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1052///
1053/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1054/// via [`into_future`] (new behaviour in `v0.15`).
1055///
1056/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1057/// unfortunately fail to resolve them when called within the Rust future. This is because the
1058/// function is being called from a Rust thread, not inside an actual Python coroutine context.
1059/// >
1060/// > As a workaround, you can get the `contextvars` from the current task locals using
1061/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1062/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1063/// synchronous function, and restore the previous context when it returns or raises an exception.
1064///
1065/// # Arguments
1066/// * `py` - The current PyO3 GIL guard
1067/// * `fut` - The Rust future to be converted
1068///
1069/// # Examples
1070///
1071/// ```no_run
1072/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1073/// #
1074/// # use pyo3_asyncio::{
1075/// #     TaskLocals,
1076/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1077/// # };
1078/// #
1079/// # struct MyCustomJoinError;
1080/// #
1081/// # impl JoinError for MyCustomJoinError {
1082/// #     fn is_panic(&self) -> bool {
1083/// #         unreachable!()
1084/// #     }
1085/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1086/// #         unreachable!()
1087/// #     }
1088/// # }
1089/// #
1090/// # struct MyCustomJoinHandle;
1091/// #
1092/// # impl Future for MyCustomJoinHandle {
1093/// #     type Output = Result<(), MyCustomJoinError>;
1094/// #
1095/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1096/// #         unreachable!()
1097/// #     }
1098/// # }
1099/// #
1100/// # struct MyCustomRuntime;
1101/// #
1102/// # impl MyCustomRuntime {
1103/// #     async fn sleep(_: Duration) {
1104/// #         unreachable!()
1105/// #     }
1106/// # }
1107/// #
1108/// # impl Runtime for MyCustomRuntime {
1109/// #     type JoinError = MyCustomJoinError;
1110/// #     type JoinHandle = MyCustomJoinHandle;
1111/// #
1112/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1113/// #     where
1114/// #         F: Future<Output = ()> + Send + 'static
1115/// #     {
1116/// #         unreachable!()
1117/// #     }
1118/// # }
1119/// #
1120/// # impl ContextExt for MyCustomRuntime {
1121/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1122/// #     where
1123/// #         F: Future<Output = R> + Send + 'static
1124/// #     {
1125/// #         unreachable!()
1126/// #     }
1127/// #     fn get_task_locals() -> Option<TaskLocals> {
1128/// #         unreachable!()
1129/// #     }
1130/// # }
1131/// #
1132/// # impl SpawnLocalExt for MyCustomRuntime {
1133/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
1134/// #     where
1135/// #         F: Future<Output = ()> + 'static
1136/// #     {
1137/// #         unreachable!()
1138/// #     }
1139/// # }
1140/// #
1141/// # impl LocalContextExt for MyCustomRuntime {
1142/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1143/// #     where
1144/// #         F: Future<Output = R> + 'static
1145/// #     {
1146/// #         unreachable!()
1147/// #     }
1148/// # }
1149/// #
1150/// use std::{rc::Rc, time::Duration};
1151///
1152/// use pyo3::prelude::*;
1153///
1154/// /// Awaitable sleep function
1155/// #[pyfunction]
1156/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
1157///     // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py
1158///     let secs = Rc::new(secs);
1159///
1160///     pyo3_asyncio::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1161///         MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1162///         Ok(())
1163///     })
1164/// }
1165/// ```
1166#[deprecated(
1167    since = "0.18.0",
1168    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1169)]
1170#[allow(deprecated)]
1171pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<&PyAny>
1172where
1173    R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1174    F: Future<Output = PyResult<T>> + 'static,
1175    T: IntoPy<PyObject>,
1176{
1177    local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1178}
1179
1180/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1181///
1182/// **This API is marked as unstable** and is only available when the
1183/// `unstable-streams` crate feature is enabled. This comes with no
1184/// stability guarantees, and could be changed or removed at any time.
1185///
1186/// # Arguments
1187/// * `locals` - The current task locals
1188/// * `gen` - The Python async generator to be converted
1189///
1190/// # Examples
1191/// ```no_run
1192/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1193/// #
1194/// # use pyo3_asyncio::{
1195/// #     TaskLocals,
1196/// #     generic::{JoinError, ContextExt, Runtime}
1197/// # };
1198/// #
1199/// # struct MyCustomJoinError;
1200/// #
1201/// # impl JoinError for MyCustomJoinError {
1202/// #     fn is_panic(&self) -> bool {
1203/// #         unreachable!()
1204/// #     }
1205/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1206/// #         unreachable!()
1207/// #     }
1208/// # }
1209/// #
1210/// # struct MyCustomJoinHandle;
1211/// #
1212/// # impl Future for MyCustomJoinHandle {
1213/// #     type Output = Result<(), MyCustomJoinError>;
1214/// #
1215/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1216/// #         unreachable!()
1217/// #     }
1218/// # }
1219/// #
1220/// # struct MyCustomRuntime;
1221/// #
1222/// # impl Runtime for MyCustomRuntime {
1223/// #     type JoinError = MyCustomJoinError;
1224/// #     type JoinHandle = MyCustomJoinHandle;
1225/// #
1226/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1227/// #     where
1228/// #         F: Future<Output = ()> + Send + 'static
1229/// #     {
1230/// #         unreachable!()
1231/// #     }
1232/// # }
1233/// #
1234/// # impl ContextExt for MyCustomRuntime {
1235/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1236/// #     where
1237/// #         F: Future<Output = R> + Send + 'static
1238/// #     {
1239/// #         unreachable!()
1240/// #     }
1241/// #     fn get_task_locals() -> Option<TaskLocals> {
1242/// #         unreachable!()
1243/// #     }
1244/// # }
1245///
1246/// use pyo3::prelude::*;
1247/// use futures::{StreamExt, TryStreamExt};
1248///
1249/// const TEST_MOD: &str = r#"
1250/// import asyncio
1251///
1252/// async def gen():
1253///     for i in range(10):
1254///         await asyncio.sleep(0.1)
1255///         yield i
1256/// "#;
1257///
1258/// # async fn test_async_gen() -> PyResult<()> {
1259/// let stream = Python::with_gil(|py| {
1260///     let test_mod = PyModule::from_code(
1261///         py,
1262///         TEST_MOD,
1263///         "test_rust_coroutine/test_mod.py",
1264///         "test_mod",
1265///     )?;
1266///
1267///     pyo3_asyncio::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1268///         pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
1269///         test_mod.call_method0("gen")?
1270///     )
1271/// })?;
1272///
1273/// let vals = stream
1274///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
1275///     .try_collect::<Vec<i32>>()
1276///     .await?;
1277///
1278/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1279///
1280/// Ok(())
1281/// # }
1282/// ```
1283#[cfg(feature = "unstable-streams")]
1284pub fn into_stream_with_locals_v1<'p, R>(
1285    locals: TaskLocals,
1286    gen: &'p PyAny,
1287) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1288where
1289    R: Runtime,
1290{
1291    let (tx, rx) = async_channel::bounded(1);
1292    let anext = PyObject::from(gen.getattr("__anext__")?);
1293
1294    R::spawn(async move {
1295        loop {
1296            let fut = Python::with_gil(|py| -> PyResult<_> {
1297                into_future_with_locals(&locals, anext.as_ref(py).call0()?)
1298            });
1299            let item = match fut {
1300                Ok(fut) => match fut.await {
1301                    Ok(item) => Ok(item),
1302                    Err(e) => {
1303                        let stop_iter = Python::with_gil(|py| {
1304                            e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1305                        });
1306
1307                        if stop_iter {
1308                            // end the iteration
1309                            break;
1310                        } else {
1311                            Err(e)
1312                        }
1313                    }
1314                },
1315                Err(e) => Err(e),
1316            };
1317
1318            if tx.send(item).await.is_err() {
1319                // receiving side was dropped
1320                break;
1321            }
1322        }
1323    });
1324
1325    Ok(rx)
1326}
1327
1328/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1329///
1330/// **This API is marked as unstable** and is only available when the
1331/// `unstable-streams` crate feature is enabled. This comes with no
1332/// stability guarantees, and could be changed or removed at any time.
1333///
1334/// # Arguments
1335/// * `gen` - The Python async generator to be converted
1336///
1337/// # Examples
1338/// ```no_run
1339/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1340/// #
1341/// # use pyo3_asyncio::{
1342/// #     TaskLocals,
1343/// #     generic::{JoinError, ContextExt, Runtime}
1344/// # };
1345/// #
1346/// # struct MyCustomJoinError;
1347/// #
1348/// # impl JoinError for MyCustomJoinError {
1349/// #     fn is_panic(&self) -> bool {
1350/// #         unreachable!()
1351/// #     }
1352/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1353/// #         unreachable!()
1354/// #     }
1355/// # }
1356/// #
1357/// # struct MyCustomJoinHandle;
1358/// #
1359/// # impl Future for MyCustomJoinHandle {
1360/// #     type Output = Result<(), MyCustomJoinError>;
1361/// #
1362/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1363/// #         unreachable!()
1364/// #     }
1365/// # }
1366/// #
1367/// # struct MyCustomRuntime;
1368/// #
1369/// # impl Runtime for MyCustomRuntime {
1370/// #     type JoinError = MyCustomJoinError;
1371/// #     type JoinHandle = MyCustomJoinHandle;
1372/// #
1373/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1374/// #     where
1375/// #         F: Future<Output = ()> + Send + 'static
1376/// #     {
1377/// #         unreachable!()
1378/// #     }
1379/// # }
1380/// #
1381/// # impl ContextExt for MyCustomRuntime {
1382/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1383/// #     where
1384/// #         F: Future<Output = R> + Send + 'static
1385/// #     {
1386/// #         unreachable!()
1387/// #     }
1388/// #     fn get_task_locals() -> Option<TaskLocals> {
1389/// #         unreachable!()
1390/// #     }
1391/// # }
1392///
1393/// use pyo3::prelude::*;
1394/// use futures::{StreamExt, TryStreamExt};
1395///
1396/// const TEST_MOD: &str = r#"
1397/// import asyncio
1398///
1399/// async def gen():
1400///     for i in range(10):
1401///         await asyncio.sleep(0.1)
1402///         yield i
1403/// "#;
1404///
1405/// # async fn test_async_gen() -> PyResult<()> {
1406/// let stream = Python::with_gil(|py| {
1407///     let test_mod = PyModule::from_code(
1408///         py,
1409///         TEST_MOD,
1410///         "test_rust_coroutine/test_mod.py",
1411///         "test_mod",
1412///     )?;
1413///
1414///     pyo3_asyncio::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1415/// })?;
1416///
1417/// let vals = stream
1418///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
1419///     .try_collect::<Vec<i32>>()
1420///     .await?;
1421///
1422/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1423///
1424/// Ok(())
1425/// # }
1426/// ```
1427#[cfg(feature = "unstable-streams")]
1428pub fn into_stream_v1<'p, R>(
1429    gen: &'p PyAny,
1430) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1431where
1432    R: Runtime + ContextExt,
1433{
1434    into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1435}
1436
1437fn py_true() -> PyObject {
1438    static TRUE: OnceCell<PyObject> = OnceCell::new();
1439    TRUE.get_or_init(|| Python::with_gil(|py| true.into_py(py)))
1440        .clone()
1441}
1442fn py_false() -> PyObject {
1443    static FALSE: OnceCell<PyObject> = OnceCell::new();
1444    FALSE
1445        .get_or_init(|| Python::with_gil(|py| false.into_py(py)))
1446        .clone()
1447}
1448
1449trait Sender: Send + 'static {
1450    fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject>;
1451    fn close(&mut self) -> PyResult<()>;
1452}
1453
1454struct GenericSender<R>
1455where
1456    R: Runtime,
1457{
1458    runtime: PhantomData<R>,
1459    tx: mpsc::Sender<PyObject>,
1460}
1461
1462impl<R> Sender for GenericSender<R>
1463where
1464    R: Runtime + ContextExt,
1465{
1466    fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject> {
1467        match self.tx.try_send(item.clone()) {
1468            Ok(_) => Ok(py_true()),
1469            Err(e) => {
1470                if e.is_full() {
1471                    let mut tx = self.tx.clone();
1472                    Python::with_gil(move |py| {
1473                        Ok(
1474                            future_into_py_with_locals::<R, _, PyObject>(py, locals, async move {
1475                                if tx.flush().await.is_err() {
1476                                    // receiving side disconnected
1477                                    return Ok(py_false());
1478                                }
1479                                if tx.send(item).await.is_err() {
1480                                    // receiving side disconnected
1481                                    return Ok(py_false());
1482                                }
1483                                Ok(py_true())
1484                            })?
1485                            .into(),
1486                        )
1487                    })
1488                } else {
1489                    Ok(py_false())
1490                }
1491            }
1492        }
1493    }
1494    fn close(&mut self) -> PyResult<()> {
1495        self.tx.close_channel();
1496        Ok(())
1497    }
1498}
1499
1500#[pyclass]
1501struct SenderGlue {
1502    locals: TaskLocals,
1503    tx: Box<dyn Sender>,
1504}
1505#[pymethods]
1506impl SenderGlue {
1507    pub fn send(&mut self, item: PyObject) -> PyResult<PyObject> {
1508        self.tx.send(self.locals.clone(), item)
1509    }
1510    pub fn close(&mut self) -> PyResult<()> {
1511        self.tx.close()
1512    }
1513}
1514
1515#[cfg(feature = "unstable-streams")]
1516const STREAM_GLUE: &str = r#"
1517import asyncio
1518
1519async def forward(gen, sender):
1520    async for item in gen:
1521        should_continue = sender.send(item)
1522
1523        if asyncio.iscoroutine(should_continue):
1524            should_continue = await should_continue
1525
1526        if should_continue:
1527            continue
1528        else:
1529            break
1530
1531    sender.close()
1532"#;
1533
1534/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1535///
1536/// **This API is marked as unstable** and is only available when the
1537/// `unstable-streams` crate feature is enabled. This comes with no
1538/// stability guarantees, and could be changed or removed at any time.
1539///
1540/// # Arguments
1541/// * `locals` - The current task locals
1542/// * `gen` - The Python async generator to be converted
1543///
1544/// # Examples
1545/// ```no_run
1546/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1547/// #
1548/// # use pyo3_asyncio::{
1549/// #     TaskLocals,
1550/// #     generic::{JoinError, ContextExt, Runtime}
1551/// # };
1552/// #
1553/// # struct MyCustomJoinError;
1554/// #
1555/// # impl JoinError for MyCustomJoinError {
1556/// #     fn is_panic(&self) -> bool {
1557/// #         unreachable!()
1558/// #     }
1559/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1560/// #         unreachable!()
1561/// #     }
1562/// # }
1563/// #
1564/// # struct MyCustomJoinHandle;
1565/// #
1566/// # impl Future for MyCustomJoinHandle {
1567/// #     type Output = Result<(), MyCustomJoinError>;
1568/// #
1569/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1570/// #         unreachable!()
1571/// #     }
1572/// # }
1573/// #
1574/// # struct MyCustomRuntime;
1575/// #
1576/// # impl Runtime for MyCustomRuntime {
1577/// #     type JoinError = MyCustomJoinError;
1578/// #     type JoinHandle = MyCustomJoinHandle;
1579/// #
1580/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1581/// #     where
1582/// #         F: Future<Output = ()> + Send + 'static
1583/// #     {
1584/// #         unreachable!()
1585/// #     }
1586/// # }
1587/// #
1588/// # impl ContextExt for MyCustomRuntime {
1589/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1590/// #     where
1591/// #         F: Future<Output = R> + Send + 'static
1592/// #     {
1593/// #         unreachable!()
1594/// #     }
1595/// #     fn get_task_locals() -> Option<TaskLocals> {
1596/// #         unreachable!()
1597/// #     }
1598/// # }
1599///
1600/// use pyo3::prelude::*;
1601/// use futures::{StreamExt, TryStreamExt};
1602///
1603/// const TEST_MOD: &str = r#"
1604/// import asyncio
1605///
1606/// async def gen():
1607///     for i in range(10):
1608///         await asyncio.sleep(0.1)
1609///         yield i
1610/// "#;
1611///
1612/// # async fn test_async_gen() -> PyResult<()> {
1613/// let stream = Python::with_gil(|py| {
1614///     let test_mod = PyModule::from_code(
1615///         py,
1616///         TEST_MOD,
1617///         "test_rust_coroutine/test_mod.py",
1618///         "test_mod",
1619///     )?;
1620///
1621///     pyo3_asyncio::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1622///         pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
1623///         test_mod.call_method0("gen")?
1624///     )
1625/// })?;
1626///
1627/// let vals = stream
1628///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
1629///     .try_collect::<Vec<i32>>()
1630///     .await?;
1631///
1632/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1633///
1634/// Ok(())
1635/// # }
1636/// ```
1637#[cfg(feature = "unstable-streams")]
1638pub fn into_stream_with_locals_v2<'p, R>(
1639    locals: TaskLocals,
1640    gen: &'p PyAny,
1641) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1642where
1643    R: Runtime + ContextExt,
1644{
1645    static GLUE_MOD: OnceCell<PyObject> = OnceCell::new();
1646    let py = gen.py();
1647    let glue = GLUE_MOD
1648        .get_or_try_init(|| -> PyResult<PyObject> {
1649            Ok(PyModule::from_code(
1650                py,
1651                STREAM_GLUE,
1652                "pyo3_asyncio/pyo3_asyncio_glue.py",
1653                "pyo3_asyncio_glue",
1654            )?
1655            .into())
1656        })?
1657        .as_ref(py);
1658
1659    let (tx, rx) = mpsc::channel(10);
1660
1661    locals.event_loop(py).call_method1(
1662        "call_soon_threadsafe",
1663        (
1664            locals.event_loop(py).getattr("create_task")?,
1665            glue.call_method1(
1666                "forward",
1667                (
1668                    gen,
1669                    SenderGlue {
1670                        locals,
1671                        tx: Box::new(GenericSender {
1672                            runtime: PhantomData::<R>,
1673                            tx,
1674                        }),
1675                    },
1676                ),
1677            )?,
1678        ),
1679    )?;
1680    Ok(rx)
1681}
1682
1683/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1684///
1685/// **This API is marked as unstable** and is only available when the
1686/// `unstable-streams` crate feature is enabled. This comes with no
1687/// stability guarantees, and could be changed or removed at any time.
1688///
1689/// # Arguments
1690/// * `gen` - The Python async generator to be converted
1691///
1692/// # Examples
1693/// ```no_run
1694/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1695/// #
1696/// # use pyo3_asyncio::{
1697/// #     TaskLocals,
1698/// #     generic::{JoinError, ContextExt, Runtime}
1699/// # };
1700/// #
1701/// # struct MyCustomJoinError;
1702/// #
1703/// # impl JoinError for MyCustomJoinError {
1704/// #     fn is_panic(&self) -> bool {
1705/// #         unreachable!()
1706/// #     }
1707/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1708/// #         unreachable!()
1709/// #     }
1710/// # }
1711/// #
1712/// # struct MyCustomJoinHandle;
1713/// #
1714/// # impl Future for MyCustomJoinHandle {
1715/// #     type Output = Result<(), MyCustomJoinError>;
1716/// #
1717/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1718/// #         unreachable!()
1719/// #     }
1720/// # }
1721/// #
1722/// # struct MyCustomRuntime;
1723/// #
1724/// # impl Runtime for MyCustomRuntime {
1725/// #     type JoinError = MyCustomJoinError;
1726/// #     type JoinHandle = MyCustomJoinHandle;
1727/// #
1728/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1729/// #     where
1730/// #         F: Future<Output = ()> + Send + 'static
1731/// #     {
1732/// #         unreachable!()
1733/// #     }
1734/// # }
1735/// #
1736/// # impl ContextExt for MyCustomRuntime {
1737/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1738/// #     where
1739/// #         F: Future<Output = R> + Send + 'static
1740/// #     {
1741/// #         unreachable!()
1742/// #     }
1743/// #     fn get_task_locals() -> Option<TaskLocals> {
1744/// #         unreachable!()
1745/// #     }
1746/// # }
1747///
1748/// use pyo3::prelude::*;
1749/// use futures::{StreamExt, TryStreamExt};
1750///
1751/// const TEST_MOD: &str = r#"
1752/// import asyncio
1753///
1754/// async def gen():
1755///     for i in range(10):
1756///         await asyncio.sleep(0.1)
1757///         yield i
1758/// "#;
1759///
1760/// # async fn test_async_gen() -> PyResult<()> {
1761/// let stream = Python::with_gil(|py| {
1762///     let test_mod = PyModule::from_code(
1763///         py,
1764///         TEST_MOD,
1765///         "test_rust_coroutine/test_mod.py",
1766///         "test_mod",
1767///     )?;
1768///
1769///     pyo3_asyncio::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1770/// })?;
1771///
1772/// let vals = stream
1773///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
1774///     .try_collect::<Vec<i32>>()
1775///     .await?;
1776///
1777/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1778///
1779/// Ok(())
1780/// # }
1781/// ```
1782#[cfg(feature = "unstable-streams")]
1783pub fn into_stream_v2<'p, R>(
1784    gen: &'p PyAny,
1785) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1786where
1787    R: Runtime + ContextExt,
1788{
1789    into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1790}