pyo3_asyncio_0_21/
generic.rs

1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio-0-21]
12//! version = "0.21"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17    future::Future,
18    marker::PhantomData,
19    pin::Pin,
20    sync::{Arc, Mutex},
21    task::{Context, Poll},
22};
23
24use futures::{
25    channel::{mpsc, oneshot},
26    SinkExt,
27};
28use once_cell::sync::OnceCell;
29use pin_project_lite::pin_project;
30use pyo3::prelude::*;
31
32use crate::{
33    asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
34    get_running_loop, into_future_with_locals, TaskLocals,
35};
36
37/// Generic utilities for a JoinError
38pub trait JoinError {
39    /// Check if the spawned task exited because of a panic
40    fn is_panic(&self) -> bool;
41    /// Get the panic object associated with the error.  Panics if `is_panic` is not true.
42    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
43}
44
45/// Generic Rust async/await runtime
46pub trait Runtime: Send + 'static {
47    /// The error returned by a JoinHandle after being awaited
48    type JoinError: JoinError + Send;
49    /// A future that completes with the result of the spawned task
50    type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
51
52    /// Spawn a future onto this runtime's event loop
53    fn spawn<F>(fut: F) -> Self::JoinHandle
54    where
55        F: Future<Output = ()> + Send + 'static;
56}
57
58/// Extension trait for async/await runtimes that support spawning local tasks
59pub trait SpawnLocalExt: Runtime {
60    /// Spawn a !Send future onto this runtime's event loop
61    fn spawn_local<F>(fut: F) -> Self::JoinHandle
62    where
63        F: Future<Output = ()> + 'static;
64}
65
66/// Exposes the utilities necessary for using task-local data in the Runtime
67pub trait ContextExt: Runtime {
68    /// Set the task locals for the given future
69    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
70    where
71        F: Future<Output = R> + Send + 'static;
72
73    /// Get the task locals for the current task
74    fn get_task_locals() -> Option<TaskLocals>;
75}
76
77/// Adds the ability to scope task-local data for !Send futures
78pub trait LocalContextExt: Runtime {
79    /// Set the task locals for the given !Send future
80    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
81    where
82        F: Future<Output = R> + 'static;
83}
84
85/// Get the current event loop from either Python or Rust async task local context
86///
87/// This function first checks if the runtime has a task-local reference to the Python event loop.
88/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
89/// with the current OS thread.
90pub fn get_current_loop<R>(py: Python) -> PyResult<Bound<PyAny>>
91where
92    R: ContextExt,
93{
94    if let Some(locals) = R::get_task_locals() {
95        Ok(locals.event_loop.into_bound(py))
96    } else {
97        get_running_loop(py)
98    }
99}
100
101/// Either copy the task locals from the current task OR get the current running loop and
102/// contextvars from Python.
103pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
104where
105    R: ContextExt,
106{
107    if let Some(locals) = R::get_task_locals() {
108        Ok(locals)
109    } else {
110        Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
111    }
112}
113
114/// Run the event loop until the given Future completes
115///
116/// After this function returns, the event loop can be resumed with [`run_until_complete`]
117///
118/// # Arguments
119/// * `event_loop` - The Python event loop that should run the future
120/// * `fut` - The future to drive to completion
121///
122/// # Examples
123///
124/// ```no_run
125/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
126/// #
127/// # use pyo3_asyncio_0_21::{
128/// #     TaskLocals,
129/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
130/// # };
131/// #
132/// # struct MyCustomJoinError;
133/// #
134/// # impl JoinError for MyCustomJoinError {
135/// #     fn is_panic(&self) -> bool {
136/// #         unreachable!()
137/// #     }
138/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
139/// #         unreachable!()
140/// #     }
141/// # }
142/// #
143/// # struct MyCustomJoinHandle;
144/// #
145/// # impl Future for MyCustomJoinHandle {
146/// #     type Output = Result<(), MyCustomJoinError>;
147/// #
148/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
149/// #         unreachable!()
150/// #     }
151/// # }
152/// #
153/// # struct MyCustomRuntime;
154/// #
155/// # impl Runtime for MyCustomRuntime {
156/// #     type JoinError = MyCustomJoinError;
157/// #     type JoinHandle = MyCustomJoinHandle;
158/// #
159/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
160/// #     where
161/// #         F: Future<Output = ()> + Send + 'static
162/// #     {
163/// #         unreachable!()
164/// #     }
165/// # }
166/// #
167/// # impl ContextExt for MyCustomRuntime {
168/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
169/// #     where
170/// #         F: Future<Output = R> + Send + 'static
171/// #     {
172/// #         unreachable!()
173/// #     }
174/// #     fn get_task_locals() -> Option<TaskLocals> {
175/// #         unreachable!()
176/// #     }
177/// # }
178/// #
179/// # use std::time::Duration;
180/// #
181/// # use pyo3::prelude::*;
182/// #
183/// # Python::with_gil(|py| -> PyResult<()> {
184/// # let event_loop = py.import_bound("asyncio")?.call_method0("new_event_loop")?;
185/// # #[cfg(feature = "tokio-runtime")]
186/// pyo3_asyncio_0_21::generic::run_until_complete::<MyCustomRuntime, _, _>(&event_loop, async move {
187///     tokio::time::sleep(Duration::from_secs(1)).await;
188///     Ok(())
189/// })?;
190/// # Ok(())
191/// # }).unwrap();
192/// ```
193pub fn run_until_complete<R, F, T>(event_loop: &Bound<PyAny>, fut: F) -> PyResult<T>
194where
195    R: Runtime + ContextExt,
196    F: Future<Output = PyResult<T>> + Send + 'static,
197    T: Send + Sync + 'static,
198{
199    let py = event_loop.py();
200    let result_tx = Arc::new(Mutex::new(None));
201    let result_rx = Arc::clone(&result_tx);
202    let coro = future_into_py_with_locals::<R, _, ()>(
203        py,
204        TaskLocals::new(event_loop.clone()).copy_context(py)?,
205        async move {
206            let val = fut.await?;
207            if let Ok(mut result) = result_tx.lock() {
208                *result = Some(val);
209            }
210            Ok(())
211        },
212    )?;
213
214    event_loop.call_method1("run_until_complete", (coro,))?;
215
216    let result = result_rx.lock().unwrap().take().unwrap();
217    Ok(result)
218}
219
220/// Run the event loop until the given Future completes
221///
222/// # Arguments
223/// * `py` - The current PyO3 GIL guard
224/// * `fut` - The future to drive to completion
225///
226/// # Examples
227///
228/// ```no_run
229/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
230/// #
231/// # use pyo3_asyncio_0_21::{
232/// #     TaskLocals,
233/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
234/// # };
235/// #
236/// # struct MyCustomJoinError;
237/// #
238/// # impl JoinError for MyCustomJoinError {
239/// #     fn is_panic(&self) -> bool {
240/// #         unreachable!()
241/// #     }
242/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
243/// #         unreachable!()
244/// #     }
245/// # }
246/// #
247/// # struct MyCustomJoinHandle;
248/// #
249/// # impl Future for MyCustomJoinHandle {
250/// #     type Output = Result<(), MyCustomJoinError>;
251/// #
252/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
253/// #         unreachable!()
254/// #     }
255/// # }
256/// #
257/// # struct MyCustomRuntime;
258/// #
259/// # impl Runtime for MyCustomRuntime {
260/// #     type JoinError = MyCustomJoinError;
261/// #     type JoinHandle = MyCustomJoinHandle;
262/// #
263/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
264/// #     where
265/// #         F: Future<Output = ()> + Send + 'static
266/// #     {
267/// #         unreachable!()
268/// #     }
269/// # }
270/// #
271/// # impl ContextExt for MyCustomRuntime {
272/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
273/// #     where
274/// #         F: Future<Output = R> + Send + 'static
275/// #     {
276/// #         unreachable!()
277/// #     }
278/// #     fn get_task_locals() -> Option<TaskLocals> {
279/// #         unreachable!()
280/// #     }
281/// # }
282/// #
283/// # use std::time::Duration;
284/// # async fn custom_sleep(_duration: Duration) { }
285/// #
286/// # use pyo3::prelude::*;
287/// #
288/// fn main() {
289///     Python::with_gil(|py| {
290///         pyo3_asyncio_0_21::generic::run::<MyCustomRuntime, _, _>(py, async move {
291///             custom_sleep(Duration::from_secs(1)).await;
292///             Ok(())
293///         })
294///         .map_err(|e| {
295///             e.print_and_set_sys_last_vars(py);
296///         })
297///         .unwrap();
298///     })
299/// }
300/// ```
301pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
302where
303    R: Runtime + ContextExt,
304    F: Future<Output = PyResult<T>> + Send + 'static,
305    T: Send + Sync + 'static,
306{
307    let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
308
309    let result = run_until_complete::<R, F, T>(&event_loop, fut);
310
311    close(event_loop)?;
312
313    result
314}
315
316fn cancelled(future: &Bound<PyAny>) -> PyResult<bool> {
317    future.getattr("cancelled")?.call0()?.is_truthy()
318}
319
320#[pyclass]
321struct CheckedCompletor;
322
323#[pymethods]
324impl CheckedCompletor {
325    fn __call__(
326        &self,
327        future: &Bound<PyAny>,
328        complete: &Bound<PyAny>,
329        value: &Bound<PyAny>,
330    ) -> PyResult<()> {
331        if cancelled(future)? {
332            return Ok(());
333        }
334
335        complete.call1((value,))?;
336
337        Ok(())
338    }
339}
340
341fn set_result(
342    event_loop: &Bound<PyAny>,
343    future: &Bound<PyAny>,
344    result: PyResult<PyObject>,
345) -> PyResult<()> {
346    let py = event_loop.py();
347    let none = py.None().into_bound(py);
348
349    let (complete, val) = match result {
350        Ok(val) => (future.getattr("set_result")?, val.into_py(py)),
351        Err(err) => (future.getattr("set_exception")?, err.into_py(py)),
352    };
353    call_soon_threadsafe(event_loop, &none, (CheckedCompletor, future, complete, val))?;
354
355    Ok(())
356}
357
358/// Convert a Python `awaitable` into a Rust Future
359///
360/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
361/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
362/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
363///
364/// # Arguments
365/// * `awaitable` - The Python `awaitable` to be converted
366///
367/// # Examples
368///
369/// ```no_run
370/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
371/// #
372/// # use pyo3::prelude::*;
373/// #
374/// # use pyo3_asyncio_0_21::{
375/// #     TaskLocals,
376/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
377/// # };
378/// #
379/// # struct MyCustomJoinError;
380/// #
381/// # impl JoinError for MyCustomJoinError {
382/// #     fn is_panic(&self) -> bool {
383/// #         unreachable!()
384/// #     }
385/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
386/// #         unreachable!()
387/// #     }
388/// # }
389/// #
390/// # struct MyCustomJoinHandle;
391/// #
392/// # impl Future for MyCustomJoinHandle {
393/// #     type Output = Result<(), MyCustomJoinError>;
394/// #
395/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
396/// #         unreachable!()
397/// #     }
398/// # }
399/// #
400/// # struct MyCustomRuntime;
401/// #
402/// # impl MyCustomRuntime {
403/// #     async fn sleep(_: Duration) {
404/// #         unreachable!()
405/// #     }
406/// # }
407/// #
408/// # impl Runtime for MyCustomRuntime {
409/// #     type JoinError = MyCustomJoinError;
410/// #     type JoinHandle = MyCustomJoinHandle;
411/// #
412/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
413/// #     where
414/// #         F: Future<Output = ()> + Send + 'static
415/// #     {
416/// #         unreachable!()
417/// #     }
418/// # }
419/// #
420/// # impl ContextExt for MyCustomRuntime {
421/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
422/// #     where
423/// #         F: Future<Output = R> + Send + 'static
424/// #     {
425/// #         unreachable!()
426/// #     }
427/// #     fn get_task_locals() -> Option<TaskLocals> {
428/// #         unreachable!()
429/// #     }
430/// # }
431/// #
432/// const PYTHON_CODE: &'static str = r#"
433/// import asyncio
434///
435/// async def py_sleep(duration):
436///     await asyncio.sleep(duration)
437/// "#;
438///
439/// async fn py_sleep(seconds: f32) -> PyResult<()> {
440///     let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
441///         Ok(
442///             PyModule::from_code_bound(
443///                 py,
444///                 PYTHON_CODE,
445///                 "test_into_future/test_mod.py",
446///                 "test_mod"
447///             )?
448///             .into()
449///         )
450///     })?;
451///
452///     Python::with_gil(|py| {
453///         pyo3_asyncio_0_21::generic::into_future::<MyCustomRuntime>(
454///             test_mod
455///                 .call_method1(py, "py_sleep", (seconds.into_py(py),))?
456///                 .into_bound(py),
457///         )
458///     })?
459///     .await?;
460///     Ok(())
461/// }
462/// ```
463pub fn into_future<R>(
464    awaitable: Bound<PyAny>,
465) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
466where
467    R: Runtime + ContextExt,
468{
469    into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
470}
471
472/// Convert a Rust Future into a Python awaitable with a generic runtime
473///
474/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
475/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
476///
477/// Python `contextvars` are preserved when calling async Python functions within the Rust future
478/// via [`into_future`] (new behaviour in `v0.15`).
479///
480/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
481/// unfortunately fail to resolve them when called within the Rust future. This is because the
482/// function is being called from a Rust thread, not inside an actual Python coroutine context.
483/// >
484/// > As a workaround, you can get the `contextvars` from the current task locals using
485/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
486/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
487/// synchronous function, and restore the previous context when it returns or raises an exception.
488///
489/// # Arguments
490/// * `py` - PyO3 GIL guard
491/// * `locals` - The task-local data for Python
492/// * `fut` - The Rust future to be converted
493///
494/// # Examples
495///
496/// ```no_run
497/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
498/// #
499/// # use pyo3_asyncio_0_21::{
500/// #     TaskLocals,
501/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
502/// # };
503/// #
504/// # struct MyCustomJoinError;
505/// #
506/// # impl JoinError for MyCustomJoinError {
507/// #     fn is_panic(&self) -> bool {
508/// #         unreachable!()
509/// #     }
510/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
511/// #         unreachable!()
512/// #     }
513/// # }
514/// #
515/// # struct MyCustomJoinHandle;
516/// #
517/// # impl Future for MyCustomJoinHandle {
518/// #     type Output = Result<(), MyCustomJoinError>;
519/// #
520/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
521/// #         unreachable!()
522/// #     }
523/// # }
524/// #
525/// # struct MyCustomRuntime;
526/// #
527/// # impl MyCustomRuntime {
528/// #     async fn sleep(_: Duration) {
529/// #         unreachable!()
530/// #     }
531/// # }
532/// #
533/// # impl Runtime for MyCustomRuntime {
534/// #     type JoinError = MyCustomJoinError;
535/// #     type JoinHandle = MyCustomJoinHandle;
536/// #
537/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
538/// #     where
539/// #         F: Future<Output = ()> + Send + 'static
540/// #     {
541/// #         unreachable!()
542/// #     }
543/// # }
544/// #
545/// # impl ContextExt for MyCustomRuntime {
546/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
547/// #     where
548/// #         F: Future<Output = R> + Send + 'static
549/// #     {
550/// #         unreachable!()
551/// #     }
552/// #     fn get_task_locals() -> Option<TaskLocals> {
553/// #         unreachable!()
554/// #     }
555/// # }
556/// #
557/// use std::time::Duration;
558///
559/// use pyo3::prelude::*;
560///
561/// /// Awaitable sleep function
562/// #[pyfunction]
563/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
564///     let secs = secs.extract()?;
565///     pyo3_asyncio_0_21::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
566///         py,
567///         pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
568///         async move {
569///             MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
570///             Ok(())
571///         }
572///     )
573/// }
574/// ```
575#[allow(unused_must_use)]
576pub fn future_into_py_with_locals<R, F, T>(
577    py: Python,
578    locals: TaskLocals,
579    fut: F,
580) -> PyResult<Bound<PyAny>>
581where
582    R: Runtime + ContextExt,
583    F: Future<Output = PyResult<T>> + Send + 'static,
584    T: IntoPy<PyObject>,
585{
586    let (cancel_tx, cancel_rx) = oneshot::channel();
587
588    let py_fut = create_future(locals.event_loop.clone().into_bound(py))?;
589    py_fut.call_method1(
590        "add_done_callback",
591        (PyDoneCallback {
592            cancel_tx: Some(cancel_tx),
593        },),
594    )?;
595
596    let future_tx1 = PyObject::from(py_fut.clone());
597    let future_tx2 = future_tx1.clone();
598
599    R::spawn(async move {
600        let locals2 = locals.clone();
601
602        if let Err(e) = R::spawn(async move {
603            let result = R::scope(
604                locals2.clone(),
605                Cancellable::new_with_cancel_rx(fut, cancel_rx),
606            )
607            .await;
608
609            Python::with_gil(move |py| {
610                if cancelled(future_tx1.bind(py))
611                    .map_err(dump_err(py))
612                    .unwrap_or(false)
613                {
614                    return;
615                }
616
617                let _ = set_result(
618                    &locals2.event_loop(py),
619                    future_tx1.bind(py),
620                    result.map(|val| val.into_py(py)),
621                )
622                .map_err(dump_err(py));
623            });
624        })
625        .await
626        {
627            if e.is_panic() {
628                Python::with_gil(move |py| {
629                    if cancelled(future_tx2.bind(py))
630                        .map_err(dump_err(py))
631                        .unwrap_or(false)
632                    {
633                        return;
634                    }
635
636                    let panic_message = format!(
637                        "rust future panicked: {}",
638                        get_panic_message(&e.into_panic())
639                    );
640                    let _ = set_result(
641                        locals.event_loop.bind(py),
642                        future_tx2.bind(py),
643                        Err(RustPanic::new_err(panic_message)),
644                    )
645                    .map_err(dump_err(py));
646                });
647            }
648        }
649    });
650
651    Ok(py_fut)
652}
653
654fn get_panic_message(any: &dyn std::any::Any) -> &str {
655    if let Some(str_slice) = any.downcast_ref::<&str>() {
656        str_slice
657    } else if let Some(string) = any.downcast_ref::<String>() {
658        string
659    } else {
660        "unknown error"
661    }
662}
663
664pin_project! {
665    /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
666    #[must_use = "futures do nothing unless you `.await` or poll them"]
667    #[derive(Debug)]
668    struct Cancellable<T> {
669        #[pin]
670        future: T,
671        #[pin]
672        cancel_rx: oneshot::Receiver<()>,
673
674        poll_cancel_rx: bool
675    }
676}
677
678impl<T> Cancellable<T> {
679    fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
680        Self {
681            future,
682            cancel_rx,
683
684            poll_cancel_rx: true,
685        }
686    }
687}
688
689impl<F, T> Future for Cancellable<F>
690where
691    F: Future<Output = PyResult<T>>,
692    T: IntoPy<PyObject>,
693{
694    type Output = F::Output;
695
696    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
697        let this = self.project();
698
699        // First, try polling the future
700        if let Poll::Ready(v) = this.future.poll(cx) {
701            return Poll::Ready(v);
702        }
703
704        // Now check for cancellation
705        if *this.poll_cancel_rx {
706            match this.cancel_rx.poll(cx) {
707                Poll::Ready(Ok(())) => {
708                    *this.poll_cancel_rx = false;
709                    // The python future has already been cancelled, so this return value will never
710                    // be used.
711                    Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
712                        "unreachable",
713                    )))
714                }
715                Poll::Ready(Err(_)) => {
716                    *this.poll_cancel_rx = false;
717                    Poll::Pending
718                }
719                Poll::Pending => Poll::Pending,
720            }
721        } else {
722            Poll::Pending
723        }
724    }
725}
726
727#[pyclass]
728struct PyDoneCallback {
729    cancel_tx: Option<oneshot::Sender<()>>,
730}
731
732#[pymethods]
733impl PyDoneCallback {
734    pub fn __call__(&mut self, fut: &Bound<PyAny>) -> PyResult<()> {
735        let py = fut.py();
736
737        if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
738            let _ = self.cancel_tx.take().unwrap().send(());
739        }
740
741        Ok(())
742    }
743}
744
745/// Convert a Rust Future into a Python awaitable with a generic runtime
746///
747/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
748/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
749///
750/// Python `contextvars` are preserved when calling async Python functions within the Rust future
751/// via [`into_future`] (new behaviour in `v0.15`).
752///
753/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
754/// unfortunately fail to resolve them when called within the Rust future. This is because the
755/// function is being called from a Rust thread, not inside an actual Python coroutine context.
756/// >
757/// > As a workaround, you can get the `contextvars` from the current task locals using
758/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
759/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
760/// synchronous function, and restore the previous context when it returns or raises an exception.
761///
762/// # Arguments
763/// * `py` - The current PyO3 GIL guard
764/// * `fut` - The Rust future to be converted
765///
766/// # Examples
767///
768/// ```no_run
769/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
770/// #
771/// # use pyo3_asyncio_0_21::{
772/// #     TaskLocals,
773/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
774/// # };
775/// #
776/// # struct MyCustomJoinError;
777/// #
778/// # impl JoinError for MyCustomJoinError {
779/// #     fn is_panic(&self) -> bool {
780/// #         unreachable!()
781/// #     }
782/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
783/// #         unreachable!()
784/// #     }
785/// # }
786/// #
787/// # struct MyCustomJoinHandle;
788/// #
789/// # impl Future for MyCustomJoinHandle {
790/// #     type Output = Result<(), MyCustomJoinError>;
791/// #
792/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
793/// #         unreachable!()
794/// #     }
795/// # }
796/// #
797/// # struct MyCustomRuntime;
798/// #
799/// # impl MyCustomRuntime {
800/// #     async fn sleep(_: Duration) {
801/// #         unreachable!()
802/// #     }
803/// # }
804/// #
805/// # impl Runtime for MyCustomRuntime {
806/// #     type JoinError = MyCustomJoinError;
807/// #     type JoinHandle = MyCustomJoinHandle;
808/// #
809/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
810/// #     where
811/// #         F: Future<Output = ()> + Send + 'static
812/// #     {
813/// #         unreachable!()
814/// #     }
815/// # }
816/// #
817/// # impl ContextExt for MyCustomRuntime {
818/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
819/// #     where
820/// #         F: Future<Output = R> + Send + 'static
821/// #     {
822/// #         unreachable!()
823/// #     }
824/// #     fn get_task_locals() -> Option<TaskLocals> {
825/// #         unreachable!()
826/// #     }
827/// # }
828/// #
829/// use std::time::Duration;
830///
831/// use pyo3::prelude::*;
832///
833/// /// Awaitable sleep function
834/// #[pyfunction]
835/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
836///     let secs = secs.extract()?;
837///     pyo3_asyncio_0_21::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
838///         MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
839///         Ok(())
840///     })
841/// }
842/// ```
843pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
844where
845    R: Runtime + ContextExt,
846    F: Future<Output = PyResult<T>> + Send + 'static,
847    T: IntoPy<PyObject>,
848{
849    future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
850}
851
852/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
853/// specification of task locals.
854///
855/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
856/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
857///
858/// Python `contextvars` are preserved when calling async Python functions within the Rust future
859/// via [`into_future`] (new behaviour in `v0.15`).
860///
861/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
862/// unfortunately fail to resolve them when called within the Rust future. This is because the
863/// function is being called from a Rust thread, not inside an actual Python coroutine context.
864/// >
865/// > As a workaround, you can get the `contextvars` from the current task locals using
866/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
867/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
868/// synchronous function, and restore the previous context when it returns or raises an exception.
869///
870/// # Arguments
871/// * `py` - PyO3 GIL guard
872/// * `locals` - The task locals for the future
873/// * `fut` - The Rust future to be converted
874///
875/// # Examples
876///
877/// ```no_run
878/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
879/// #
880/// # use pyo3_asyncio_0_21::{
881/// #     TaskLocals,
882/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
883/// # };
884/// #
885/// # struct MyCustomJoinError;
886/// #
887/// # impl JoinError for MyCustomJoinError {
888/// #     fn is_panic(&self) -> bool {
889/// #         unreachable!()
890/// #     }
891/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
892/// #         unreachable!()
893/// #     }
894/// # }
895/// #
896/// # struct MyCustomJoinHandle;
897/// #
898/// # impl Future for MyCustomJoinHandle {
899/// #     type Output = Result<(), MyCustomJoinError>;
900/// #
901/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
902/// #         unreachable!()
903/// #     }
904/// # }
905/// #
906/// # struct MyCustomRuntime;
907/// #
908/// # impl MyCustomRuntime {
909/// #     async fn sleep(_: Duration) {
910/// #         unreachable!()
911/// #     }
912/// # }
913/// #
914/// # impl Runtime for MyCustomRuntime {
915/// #     type JoinError = MyCustomJoinError;
916/// #     type JoinHandle = MyCustomJoinHandle;
917/// #
918/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
919/// #     where
920/// #         F: Future<Output = ()> + Send + 'static
921/// #     {
922/// #         unreachable!()
923/// #     }
924/// # }
925/// #
926/// # impl ContextExt for MyCustomRuntime {
927/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
928/// #     where
929/// #         F: Future<Output = R> + Send + 'static
930/// #     {
931/// #         unreachable!()
932/// #     }
933/// #     fn get_task_locals() -> Option<TaskLocals> {
934/// #         unreachable!()
935/// #     }
936/// # }
937/// #
938/// # impl SpawnLocalExt for MyCustomRuntime {
939/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
940/// #     where
941/// #         F: Future<Output = ()> + 'static
942/// #     {
943/// #         unreachable!()
944/// #     }
945/// # }
946/// #
947/// # impl LocalContextExt for MyCustomRuntime {
948/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
949/// #     where
950/// #         F: Future<Output = R> + 'static
951/// #     {
952/// #         unreachable!()
953/// #     }
954/// # }
955/// #
956/// use std::{rc::Rc, time::Duration};
957///
958/// use pyo3::prelude::*;
959///
960/// /// Awaitable sleep function
961/// #[pyfunction]
962/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
963///     // Rc is !Send so it cannot be passed into pyo3_asyncio_0_21::generic::future_into_py
964///     let secs = Rc::new(secs);
965///
966///     pyo3_asyncio_0_21::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
967///         py,
968///         pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
969///         async move {
970///             MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
971///             Ok(())
972///         }
973///     )
974/// }
975/// ```
976#[deprecated(
977    since = "0.18.0",
978    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
979)]
980#[allow(unused_must_use)]
981pub fn local_future_into_py_with_locals<R, F, T>(
982    py: Python,
983    locals: TaskLocals,
984    fut: F,
985) -> PyResult<Bound<PyAny>>
986where
987    R: Runtime + SpawnLocalExt + LocalContextExt,
988    F: Future<Output = PyResult<T>> + 'static,
989    T: IntoPy<PyObject>,
990{
991    let (cancel_tx, cancel_rx) = oneshot::channel();
992
993    let py_fut = create_future(locals.event_loop.clone().into_bound(py))?;
994    py_fut.call_method1(
995        "add_done_callback",
996        (PyDoneCallback {
997            cancel_tx: Some(cancel_tx),
998        },),
999    )?;
1000
1001    let future_tx1 = PyObject::from(py_fut.clone());
1002    let future_tx2 = future_tx1.clone();
1003
1004    R::spawn_local(async move {
1005        let locals2 = locals.clone();
1006
1007        if let Err(e) = R::spawn_local(async move {
1008            let result = R::scope_local(
1009                locals2.clone(),
1010                Cancellable::new_with_cancel_rx(fut, cancel_rx),
1011            )
1012            .await;
1013
1014            Python::with_gil(move |py| {
1015                if cancelled(future_tx1.bind(py))
1016                    .map_err(dump_err(py))
1017                    .unwrap_or(false)
1018                {
1019                    return;
1020                }
1021
1022                let _ = set_result(
1023                    locals2.event_loop.bind(py),
1024                    future_tx1.bind(py),
1025                    result.map(|val| val.into_py(py)),
1026                )
1027                .map_err(dump_err(py));
1028            });
1029        })
1030        .await
1031        {
1032            if e.is_panic() {
1033                Python::with_gil(move |py| {
1034                    if cancelled(future_tx2.bind(py))
1035                        .map_err(dump_err(py))
1036                        .unwrap_or(false)
1037                    {
1038                        return;
1039                    }
1040
1041                    let panic_message = format!(
1042                        "rust future panicked: {}",
1043                        get_panic_message(&e.into_panic())
1044                    );
1045                    let _ = set_result(
1046                        locals.event_loop.bind(py),
1047                        future_tx2.bind(py),
1048                        Err(RustPanic::new_err(panic_message)),
1049                    )
1050                    .map_err(dump_err(py));
1051                });
1052            }
1053        }
1054    });
1055
1056    Ok(py_fut)
1057}
1058
1059/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1060///
1061/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1062/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1063///
1064/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1065/// via [`into_future`] (new behaviour in `v0.15`).
1066///
1067/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1068/// unfortunately fail to resolve them when called within the Rust future. This is because the
1069/// function is being called from a Rust thread, not inside an actual Python coroutine context.
1070/// >
1071/// > As a workaround, you can get the `contextvars` from the current task locals using
1072/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1073/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1074/// synchronous function, and restore the previous context when it returns or raises an exception.
1075///
1076/// # Arguments
1077/// * `py` - The current PyO3 GIL guard
1078/// * `fut` - The Rust future to be converted
1079///
1080/// # Examples
1081///
1082/// ```no_run
1083/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1084/// #
1085/// # use pyo3_asyncio_0_21::{
1086/// #     TaskLocals,
1087/// #     generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1088/// # };
1089/// #
1090/// # struct MyCustomJoinError;
1091/// #
1092/// # impl JoinError for MyCustomJoinError {
1093/// #     fn is_panic(&self) -> bool {
1094/// #         unreachable!()
1095/// #     }
1096/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1097/// #         unreachable!()
1098/// #     }
1099/// # }
1100/// #
1101/// # struct MyCustomJoinHandle;
1102/// #
1103/// # impl Future for MyCustomJoinHandle {
1104/// #     type Output = Result<(), MyCustomJoinError>;
1105/// #
1106/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1107/// #         unreachable!()
1108/// #     }
1109/// # }
1110/// #
1111/// # struct MyCustomRuntime;
1112/// #
1113/// # impl MyCustomRuntime {
1114/// #     async fn sleep(_: Duration) {
1115/// #         unreachable!()
1116/// #     }
1117/// # }
1118/// #
1119/// # impl Runtime for MyCustomRuntime {
1120/// #     type JoinError = MyCustomJoinError;
1121/// #     type JoinHandle = MyCustomJoinHandle;
1122/// #
1123/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1124/// #     where
1125/// #         F: Future<Output = ()> + Send + 'static
1126/// #     {
1127/// #         unreachable!()
1128/// #     }
1129/// # }
1130/// #
1131/// # impl ContextExt for MyCustomRuntime {
1132/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1133/// #     where
1134/// #         F: Future<Output = R> + Send + 'static
1135/// #     {
1136/// #         unreachable!()
1137/// #     }
1138/// #     fn get_task_locals() -> Option<TaskLocals> {
1139/// #         unreachable!()
1140/// #     }
1141/// # }
1142/// #
1143/// # impl SpawnLocalExt for MyCustomRuntime {
1144/// #     fn spawn_local<F>(fut: F) -> Self::JoinHandle
1145/// #     where
1146/// #         F: Future<Output = ()> + 'static
1147/// #     {
1148/// #         unreachable!()
1149/// #     }
1150/// # }
1151/// #
1152/// # impl LocalContextExt for MyCustomRuntime {
1153/// #     fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1154/// #     where
1155/// #         F: Future<Output = R> + 'static
1156/// #     {
1157/// #         unreachable!()
1158/// #     }
1159/// # }
1160/// #
1161/// use std::{rc::Rc, time::Duration};
1162///
1163/// use pyo3::prelude::*;
1164///
1165/// /// Awaitable sleep function
1166/// #[pyfunction]
1167/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
1168///     // Rc is !Send so it cannot be passed into pyo3_asyncio_0_21::generic::future_into_py
1169///     let secs = Rc::new(secs);
1170///
1171///     pyo3_asyncio_0_21::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1172///         MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1173///         Ok(())
1174///     })
1175/// }
1176/// ```
1177#[deprecated(
1178    since = "0.18.0",
1179    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1180)]
1181#[allow(deprecated)]
1182pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
1183where
1184    R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1185    F: Future<Output = PyResult<T>> + 'static,
1186    T: IntoPy<PyObject>,
1187{
1188    local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1189}
1190
1191/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1192///
1193/// **This API is marked as unstable** and is only available when the
1194/// `unstable-streams` crate feature is enabled. This comes with no
1195/// stability guarantees, and could be changed or removed at any time.
1196///
1197/// # Arguments
1198/// * `locals` - The current task locals
1199/// * `gen` - The Python async generator to be converted
1200///
1201/// # Examples
1202/// ```no_run
1203/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1204/// #
1205/// # use pyo3_asyncio_0_21::{
1206/// #     TaskLocals,
1207/// #     generic::{JoinError, ContextExt, Runtime}
1208/// # };
1209/// #
1210/// # struct MyCustomJoinError;
1211/// #
1212/// # impl JoinError for MyCustomJoinError {
1213/// #     fn is_panic(&self) -> bool {
1214/// #         unreachable!()
1215/// #     }
1216/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1217/// #         unreachable!()
1218/// #     }
1219/// # }
1220/// #
1221/// # struct MyCustomJoinHandle;
1222/// #
1223/// # impl Future for MyCustomJoinHandle {
1224/// #     type Output = Result<(), MyCustomJoinError>;
1225/// #
1226/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1227/// #         unreachable!()
1228/// #     }
1229/// # }
1230/// #
1231/// # struct MyCustomRuntime;
1232/// #
1233/// # impl Runtime for MyCustomRuntime {
1234/// #     type JoinError = MyCustomJoinError;
1235/// #     type JoinHandle = MyCustomJoinHandle;
1236/// #
1237/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1238/// #     where
1239/// #         F: Future<Output = ()> + Send + 'static
1240/// #     {
1241/// #         unreachable!()
1242/// #     }
1243/// # }
1244/// #
1245/// # impl ContextExt for MyCustomRuntime {
1246/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1247/// #     where
1248/// #         F: Future<Output = R> + Send + 'static
1249/// #     {
1250/// #         unreachable!()
1251/// #     }
1252/// #     fn get_task_locals() -> Option<TaskLocals> {
1253/// #         unreachable!()
1254/// #     }
1255/// # }
1256///
1257/// use pyo3::prelude::*;
1258/// use futures::{StreamExt, TryStreamExt};
1259///
1260/// const TEST_MOD: &str = r#"
1261/// import asyncio
1262///
1263/// async def gen():
1264///     for i in range(10):
1265///         await asyncio.sleep(0.1)
1266///         yield i
1267/// "#;
1268///
1269/// # async fn test_async_gen() -> PyResult<()> {
1270/// let stream = Python::with_gil(|py| {
1271///     let test_mod = PyModule::from_code_bound(
1272///         py,
1273///         TEST_MOD,
1274///         "test_rust_coroutine/test_mod.py",
1275///         "test_mod",
1276///     )?;
1277///
1278///     pyo3_asyncio_0_21::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1279///         pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
1280///         test_mod.call_method0("gen")?
1281///     )
1282/// })?;
1283///
1284/// let vals = stream
1285///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1286///     .try_collect::<Vec<i32>>()
1287///     .await?;
1288///
1289/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1290///
1291/// Ok(())
1292/// # }
1293/// ```
1294#[cfg(feature = "unstable-streams")]
1295#[allow(unused_must_use)] // False positive unused lint on `R::spawn`
1296pub fn into_stream_with_locals_v1<'p, R>(
1297    locals: TaskLocals,
1298    gen: Bound<'p, PyAny>,
1299) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1300where
1301    R: Runtime,
1302{
1303    let (tx, rx) = async_channel::bounded(1);
1304    let anext = PyObject::from(gen.getattr("__anext__")?);
1305
1306    R::spawn(async move {
1307        loop {
1308            let fut = Python::with_gil(|py| -> PyResult<_> {
1309                into_future_with_locals(&locals, anext.bind(py).call0()?)
1310            });
1311            let item = match fut {
1312                Ok(fut) => match fut.await {
1313                    Ok(item) => Ok(item),
1314                    Err(e) => {
1315                        let stop_iter = Python::with_gil(|py| {
1316                            e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1317                        });
1318
1319                        if stop_iter {
1320                            // end the iteration
1321                            break;
1322                        } else {
1323                            Err(e)
1324                        }
1325                    }
1326                },
1327                Err(e) => Err(e),
1328            };
1329
1330            if tx.send(item).await.is_err() {
1331                // receiving side was dropped
1332                break;
1333            }
1334        }
1335    });
1336
1337    Ok(rx)
1338}
1339
1340/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1341///
1342/// **This API is marked as unstable** and is only available when the
1343/// `unstable-streams` crate feature is enabled. This comes with no
1344/// stability guarantees, and could be changed or removed at any time.
1345///
1346/// # Arguments
1347/// * `gen` - The Python async generator to be converted
1348///
1349/// # Examples
1350/// ```no_run
1351/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1352/// #
1353/// # use pyo3_asyncio_0_21::{
1354/// #     TaskLocals,
1355/// #     generic::{JoinError, ContextExt, Runtime}
1356/// # };
1357/// #
1358/// # struct MyCustomJoinError;
1359/// #
1360/// # impl JoinError for MyCustomJoinError {
1361/// #     fn is_panic(&self) -> bool {
1362/// #         unreachable!()
1363/// #     }
1364/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1365/// #         unreachable!()
1366/// #     }
1367/// # }
1368/// #
1369/// # struct MyCustomJoinHandle;
1370/// #
1371/// # impl Future for MyCustomJoinHandle {
1372/// #     type Output = Result<(), MyCustomJoinError>;
1373/// #
1374/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1375/// #         unreachable!()
1376/// #     }
1377/// # }
1378/// #
1379/// # struct MyCustomRuntime;
1380/// #
1381/// # impl Runtime for MyCustomRuntime {
1382/// #     type JoinError = MyCustomJoinError;
1383/// #     type JoinHandle = MyCustomJoinHandle;
1384/// #
1385/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1386/// #     where
1387/// #         F: Future<Output = ()> + Send + 'static
1388/// #     {
1389/// #         unreachable!()
1390/// #     }
1391/// # }
1392/// #
1393/// # impl ContextExt for MyCustomRuntime {
1394/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1395/// #     where
1396/// #         F: Future<Output = R> + Send + 'static
1397/// #     {
1398/// #         unreachable!()
1399/// #     }
1400/// #     fn get_task_locals() -> Option<TaskLocals> {
1401/// #         unreachable!()
1402/// #     }
1403/// # }
1404///
1405/// use pyo3::prelude::*;
1406/// use futures::{StreamExt, TryStreamExt};
1407///
1408/// const TEST_MOD: &str = r#"
1409/// import asyncio
1410///
1411/// async def gen():
1412///     for i in range(10):
1413///         await asyncio.sleep(0.1)
1414///         yield i
1415/// "#;
1416///
1417/// # async fn test_async_gen() -> PyResult<()> {
1418/// let stream = Python::with_gil(|py| {
1419///     let test_mod = PyModule::from_code_bound(
1420///         py,
1421///         TEST_MOD,
1422///         "test_rust_coroutine/test_mod.py",
1423///         "test_mod",
1424///     )?;
1425///
1426///     pyo3_asyncio_0_21::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1427/// })?;
1428///
1429/// let vals = stream
1430///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1431///     .try_collect::<Vec<i32>>()
1432///     .await?;
1433///
1434/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1435///
1436/// Ok(())
1437/// # }
1438/// ```
1439#[cfg(feature = "unstable-streams")]
1440pub fn into_stream_v1<'p, R>(
1441    gen: Bound<'p, PyAny>,
1442) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1443where
1444    R: Runtime + ContextExt,
1445{
1446    into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1447}
1448
1449fn py_true() -> PyObject {
1450    static TRUE: OnceCell<PyObject> = OnceCell::new();
1451    TRUE.get_or_init(|| Python::with_gil(|py| true.into_py(py)))
1452        .clone()
1453}
1454fn py_false() -> PyObject {
1455    static FALSE: OnceCell<PyObject> = OnceCell::new();
1456    FALSE
1457        .get_or_init(|| Python::with_gil(|py| false.into_py(py)))
1458        .clone()
1459}
1460
1461trait Sender: Send + 'static {
1462    fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject>;
1463    fn close(&mut self) -> PyResult<()>;
1464}
1465
1466struct GenericSender<R>
1467where
1468    R: Runtime,
1469{
1470    runtime: PhantomData<R>,
1471    tx: mpsc::Sender<PyObject>,
1472}
1473
1474impl<R> Sender for GenericSender<R>
1475where
1476    R: Runtime + ContextExt,
1477{
1478    fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject> {
1479        match self.tx.try_send(item.clone()) {
1480            Ok(_) => Ok(py_true()),
1481            Err(e) => {
1482                if e.is_full() {
1483                    let mut tx = self.tx.clone();
1484                    Python::with_gil(move |py| {
1485                        Ok(
1486                            future_into_py_with_locals::<R, _, PyObject>(py, locals, async move {
1487                                if tx.flush().await.is_err() {
1488                                    // receiving side disconnected
1489                                    return Ok(py_false());
1490                                }
1491                                if tx.send(item).await.is_err() {
1492                                    // receiving side disconnected
1493                                    return Ok(py_false());
1494                                }
1495                                Ok(py_true())
1496                            })?
1497                            .into(),
1498                        )
1499                    })
1500                } else {
1501                    Ok(py_false())
1502                }
1503            }
1504        }
1505    }
1506    fn close(&mut self) -> PyResult<()> {
1507        self.tx.close_channel();
1508        Ok(())
1509    }
1510}
1511
1512#[pyclass]
1513struct SenderGlue {
1514    locals: TaskLocals,
1515    tx: Box<dyn Sender>,
1516}
1517#[pymethods]
1518impl SenderGlue {
1519    pub fn send(&mut self, item: PyObject) -> PyResult<PyObject> {
1520        self.tx.send(self.locals.clone(), item)
1521    }
1522    pub fn close(&mut self) -> PyResult<()> {
1523        self.tx.close()
1524    }
1525}
1526
1527#[cfg(feature = "unstable-streams")]
1528const STREAM_GLUE: &str = r#"
1529import asyncio
1530
1531async def forward(gen, sender):
1532    async for item in gen:
1533        should_continue = sender.send(item)
1534
1535        if asyncio.iscoroutine(should_continue):
1536            should_continue = await should_continue
1537
1538        if should_continue:
1539            continue
1540        else:
1541            break
1542
1543    sender.close()
1544"#;
1545
1546/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1547///
1548/// **This API is marked as unstable** and is only available when the
1549/// `unstable-streams` crate feature is enabled. This comes with no
1550/// stability guarantees, and could be changed or removed at any time.
1551///
1552/// # Arguments
1553/// * `locals` - The current task locals
1554/// * `gen` - The Python async generator to be converted
1555///
1556/// # Examples
1557/// ```no_run
1558/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1559/// #
1560/// # use pyo3_asyncio_0_21::{
1561/// #     TaskLocals,
1562/// #     generic::{JoinError, ContextExt, Runtime}
1563/// # };
1564/// #
1565/// # struct MyCustomJoinError;
1566/// #
1567/// # impl JoinError for MyCustomJoinError {
1568/// #     fn is_panic(&self) -> bool {
1569/// #         unreachable!()
1570/// #     }
1571/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1572/// #         unreachable!()
1573/// #     }
1574/// # }
1575/// #
1576/// # struct MyCustomJoinHandle;
1577/// #
1578/// # impl Future for MyCustomJoinHandle {
1579/// #     type Output = Result<(), MyCustomJoinError>;
1580/// #
1581/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1582/// #         unreachable!()
1583/// #     }
1584/// # }
1585/// #
1586/// # struct MyCustomRuntime;
1587/// #
1588/// # impl Runtime for MyCustomRuntime {
1589/// #     type JoinError = MyCustomJoinError;
1590/// #     type JoinHandle = MyCustomJoinHandle;
1591/// #
1592/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1593/// #     where
1594/// #         F: Future<Output = ()> + Send + 'static
1595/// #     {
1596/// #         unreachable!()
1597/// #     }
1598/// # }
1599/// #
1600/// # impl ContextExt for MyCustomRuntime {
1601/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1602/// #     where
1603/// #         F: Future<Output = R> + Send + 'static
1604/// #     {
1605/// #         unreachable!()
1606/// #     }
1607/// #     fn get_task_locals() -> Option<TaskLocals> {
1608/// #         unreachable!()
1609/// #     }
1610/// # }
1611///
1612/// use pyo3::prelude::*;
1613/// use futures::{StreamExt, TryStreamExt};
1614///
1615/// const TEST_MOD: &str = r#"
1616/// import asyncio
1617///
1618/// async def gen():
1619///     for i in range(10):
1620///         await asyncio.sleep(0.1)
1621///         yield i
1622/// "#;
1623///
1624/// # async fn test_async_gen() -> PyResult<()> {
1625/// let stream = Python::with_gil(|py| {
1626///     let test_mod = PyModule::from_code_bound(
1627///         py,
1628///         TEST_MOD,
1629///         "test_rust_coroutine/test_mod.py",
1630///         "test_mod",
1631///     )?;
1632///
1633///     pyo3_asyncio_0_21::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1634///         pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
1635///         test_mod.call_method0("gen")?
1636///     )
1637/// })?;
1638///
1639/// let vals = stream
1640///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1641///     .try_collect::<Vec<i32>>()
1642///     .await?;
1643///
1644/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1645///
1646/// Ok(())
1647/// # }
1648/// ```
1649#[cfg(feature = "unstable-streams")]
1650pub fn into_stream_with_locals_v2<'p, R>(
1651    locals: TaskLocals,
1652    gen: Bound<'p, PyAny>,
1653) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1654where
1655    R: Runtime + ContextExt,
1656{
1657    static GLUE_MOD: OnceCell<PyObject> = OnceCell::new();
1658    let py = gen.py();
1659    let glue = GLUE_MOD
1660        .get_or_try_init(|| -> PyResult<PyObject> {
1661            Ok(PyModule::from_code_bound(
1662                py,
1663                STREAM_GLUE,
1664                "pyo3_asyncio/pyo3_asyncio_glue.py",
1665                "pyo3_asyncio_glue",
1666            )?
1667            .into())
1668        })?
1669        .bind(py);
1670
1671    let (tx, rx) = mpsc::channel(10);
1672
1673    locals.event_loop(py).call_method1(
1674        "call_soon_threadsafe",
1675        (
1676            locals.event_loop(py).getattr("create_task")?,
1677            glue.call_method1(
1678                "forward",
1679                (
1680                    gen,
1681                    SenderGlue {
1682                        locals,
1683                        tx: Box::new(GenericSender {
1684                            runtime: PhantomData::<R>,
1685                            tx,
1686                        }),
1687                    },
1688                ),
1689            )?,
1690        ),
1691    )?;
1692    Ok(rx)
1693}
1694
1695/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1696///
1697/// **This API is marked as unstable** and is only available when the
1698/// `unstable-streams` crate feature is enabled. This comes with no
1699/// stability guarantees, and could be changed or removed at any time.
1700///
1701/// # Arguments
1702/// * `gen` - The Python async generator to be converted
1703///
1704/// # Examples
1705/// ```no_run
1706/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1707/// #
1708/// # use pyo3_asyncio_0_21::{
1709/// #     TaskLocals,
1710/// #     generic::{JoinError, ContextExt, Runtime}
1711/// # };
1712/// #
1713/// # struct MyCustomJoinError;
1714/// #
1715/// # impl JoinError for MyCustomJoinError {
1716/// #     fn is_panic(&self) -> bool {
1717/// #         unreachable!()
1718/// #     }
1719/// #     fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1720/// #         unreachable!()
1721/// #     }
1722/// # }
1723/// #
1724/// # struct MyCustomJoinHandle;
1725/// #
1726/// # impl Future for MyCustomJoinHandle {
1727/// #     type Output = Result<(), MyCustomJoinError>;
1728/// #
1729/// #     fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1730/// #         unreachable!()
1731/// #     }
1732/// # }
1733/// #
1734/// # struct MyCustomRuntime;
1735/// #
1736/// # impl Runtime for MyCustomRuntime {
1737/// #     type JoinError = MyCustomJoinError;
1738/// #     type JoinHandle = MyCustomJoinHandle;
1739/// #
1740/// #     fn spawn<F>(fut: F) -> Self::JoinHandle
1741/// #     where
1742/// #         F: Future<Output = ()> + Send + 'static
1743/// #     {
1744/// #         unreachable!()
1745/// #     }
1746/// # }
1747/// #
1748/// # impl ContextExt for MyCustomRuntime {
1749/// #     fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1750/// #     where
1751/// #         F: Future<Output = R> + Send + 'static
1752/// #     {
1753/// #         unreachable!()
1754/// #     }
1755/// #     fn get_task_locals() -> Option<TaskLocals> {
1756/// #         unreachable!()
1757/// #     }
1758/// # }
1759///
1760/// use pyo3::prelude::*;
1761/// use futures::{StreamExt, TryStreamExt};
1762///
1763/// const TEST_MOD: &str = r#"
1764/// import asyncio
1765///
1766/// async def gen():
1767///     for i in range(10):
1768///         await asyncio.sleep(0.1)
1769///         yield i
1770/// "#;
1771///
1772/// # async fn test_async_gen() -> PyResult<()> {
1773/// let stream = Python::with_gil(|py| {
1774///     let test_mod = PyModule::from_code_bound(
1775///         py,
1776///         TEST_MOD,
1777///         "test_rust_coroutine/test_mod.py",
1778///         "test_mod",
1779///     )?;
1780///
1781///     pyo3_asyncio_0_21::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1782/// })?;
1783///
1784/// let vals = stream
1785///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1786///     .try_collect::<Vec<i32>>()
1787///     .await?;
1788///
1789/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1790///
1791/// Ok(())
1792/// # }
1793/// ```
1794#[cfg(feature = "unstable-streams")]
1795pub fn into_stream_v2<'p, R>(
1796    gen: Bound<'p, PyAny>,
1797) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1798where
1799    R: Runtime + ContextExt,
1800{
1801    into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1802}