pyo3_asyncio/
async_std.rs

1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>async-std-runtime</code></span> PyO3 Asyncio functions specific to the async-std runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio]
12//! version = "0.20"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin};
17
18use async_std::task;
19use futures::FutureExt;
20use pyo3::prelude::*;
21
22use crate::{
23    generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt},
24    TaskLocals,
25};
26
27/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
28/// re-exports for macros
29#[cfg(feature = "attributes")]
30pub mod re_exports {
31    /// re-export spawn_blocking for use in `#[test]` macro without external dependency
32    pub use async_std::task::spawn_blocking;
33}
34
35/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> Provides the boilerplate for the `async-std` runtime and runs an async fn as main
36#[cfg(feature = "attributes")]
37pub use pyo3_asyncio_macros::async_std_main as main;
38
39/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
40/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
41/// Registers an `async-std` test with the `pyo3-asyncio` test harness
42#[cfg(all(feature = "attributes", feature = "testing"))]
43pub use pyo3_asyncio_macros::async_std_test as test;
44
45struct AsyncStdJoinErr(Box<dyn Any + Send + 'static>);
46
47impl JoinError for AsyncStdJoinErr {
48    fn is_panic(&self) -> bool {
49        true
50    }
51    fn into_panic(self) -> Box<dyn Any + Send + 'static> {
52        self.0
53    }
54}
55
56async_std::task_local! {
57    static TASK_LOCALS: RefCell<Option<TaskLocals>> = RefCell::new(None);
58}
59
60struct AsyncStdRuntime;
61
62impl Runtime for AsyncStdRuntime {
63    type JoinError = AsyncStdJoinErr;
64    type JoinHandle = task::JoinHandle<Result<(), AsyncStdJoinErr>>;
65
66    fn spawn<F>(fut: F) -> Self::JoinHandle
67    where
68        F: Future<Output = ()> + Send + 'static,
69    {
70        task::spawn(async move {
71            AssertUnwindSafe(fut)
72                .catch_unwind()
73                .await
74                .map_err(|e| AsyncStdJoinErr(e))
75        })
76    }
77}
78
79impl ContextExt for AsyncStdRuntime {
80    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
81    where
82        F: Future<Output = R> + Send + 'static,
83    {
84        let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
85        Box::pin(async move {
86            let result = fut.await;
87            TASK_LOCALS.with(|c| c.replace(old));
88            result
89        })
90    }
91
92    fn get_task_locals() -> Option<TaskLocals> {
93        match TASK_LOCALS.try_with(|c| c.borrow().clone()) {
94            Ok(locals) => locals,
95            Err(_) => None,
96        }
97    }
98}
99
100impl SpawnLocalExt for AsyncStdRuntime {
101    fn spawn_local<F>(fut: F) -> Self::JoinHandle
102    where
103        F: Future<Output = ()> + 'static,
104    {
105        task::spawn_local(async move {
106            fut.await;
107            Ok(())
108        })
109    }
110}
111
112impl LocalContextExt for AsyncStdRuntime {
113    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
114    where
115        F: Future<Output = R> + 'static,
116    {
117        let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
118        Box::pin(async move {
119            let result = fut.await;
120            TASK_LOCALS.with(|c| c.replace(old));
121            result
122        })
123    }
124}
125
126/// Set the task local event loop for the given future
127pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
128where
129    F: Future<Output = R> + Send + 'static,
130{
131    AsyncStdRuntime::scope(locals, fut).await
132}
133
134/// Set the task local event loop for the given !Send future
135pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
136where
137    F: Future<Output = R> + 'static,
138{
139    AsyncStdRuntime::scope_local(locals, fut).await
140}
141
142/// Get the current event loop from either Python or Rust async task local context
143///
144/// This function first checks if the runtime has a task-local reference to the Python event loop.
145/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
146/// associated with the current OS thread.
147pub fn get_current_loop(py: Python) -> PyResult<&PyAny> {
148    generic::get_current_loop::<AsyncStdRuntime>(py)
149}
150
151/// Either copy the task locals from the current task OR get the current running loop and
152/// contextvars from Python.
153pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
154    generic::get_current_locals::<AsyncStdRuntime>(py)
155}
156
157/// Run the event loop until the given Future completes
158///
159/// The event loop runs until the given future is complete.
160///
161/// After this function returns, the event loop can be resumed with [`run_until_complete`]
162///
163/// # Arguments
164/// * `event_loop` - The Python event loop that should run the future
165/// * `fut` - The future to drive to completion
166///
167/// # Examples
168///
169/// ```
170/// # use std::time::Duration;
171/// #
172/// # use pyo3::prelude::*;
173/// #
174/// # pyo3::prepare_freethreaded_python();
175/// #
176/// # Python::with_gil(|py| -> PyResult<()> {
177/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
178/// pyo3_asyncio::async_std::run_until_complete(event_loop, async move {
179///     async_std::task::sleep(Duration::from_secs(1)).await;
180///     Ok(())
181/// })?;
182/// # Ok(())
183/// # }).unwrap();
184/// ```
185pub fn run_until_complete<F, T>(event_loop: &PyAny, fut: F) -> PyResult<T>
186where
187    F: Future<Output = PyResult<T>> + Send + 'static,
188    T: Send + Sync + 'static,
189{
190    generic::run_until_complete::<AsyncStdRuntime, _, T>(event_loop, fut)
191}
192
193/// Run the event loop until the given Future completes
194///
195/// # Arguments
196/// * `py` - The current PyO3 GIL guard
197/// * `fut` - The future to drive to completion
198///
199/// # Examples
200///
201/// ```no_run
202/// # use std::time::Duration;
203/// #
204/// # use pyo3::prelude::*;
205/// #
206/// fn main() {
207///     // call this or use pyo3 0.14 "auto-initialize" feature
208///     pyo3::prepare_freethreaded_python();
209///
210///     Python::with_gil(|py| {
211///         pyo3_asyncio::async_std::run(py, async move {
212///             async_std::task::sleep(Duration::from_secs(1)).await;
213///             Ok(())
214///         })
215///         .map_err(|e| {
216///             e.print_and_set_sys_last_vars(py);
217///         })
218///         .unwrap();
219///     })
220/// }
221/// ```
222pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
223where
224    F: Future<Output = PyResult<T>> + Send + 'static,
225    T: Send + Sync + 'static,
226{
227    generic::run::<AsyncStdRuntime, F, T>(py, fut)
228}
229
230/// Convert a Rust Future into a Python awaitable
231///
232/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
233/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
234///
235/// Python `contextvars` are preserved when calling async Python functions within the Rust future
236/// via [`into_future`] (new behaviour in `v0.15`).
237///
238/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
239/// unfortunately fail to resolve them when called within the Rust future. This is because the
240/// function is being called from a Rust thread, not inside an actual Python coroutine context.
241/// >
242/// > As a workaround, you can get the `contextvars` from the current task locals using
243/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
244/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
245/// synchronous function, and restore the previous context when it returns or raises an exception.
246///
247/// # Arguments
248/// * `py` - PyO3 GIL guard
249/// * `locals` - The task locals for the given future
250/// * `fut` - The Rust future to be converted
251///
252/// # Examples
253///
254/// ```
255/// use std::time::Duration;
256///
257/// use pyo3::prelude::*;
258///
259/// /// Awaitable sleep function
260/// #[pyfunction]
261/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
262///     let secs = secs.extract()?;
263///     pyo3_asyncio::async_std::future_into_py_with_locals(
264///         py,
265///         pyo3_asyncio::async_std::get_current_locals(py)?,
266///         async move {
267///             async_std::task::sleep(Duration::from_secs(secs)).await;
268///             Python::with_gil(|py| Ok(py.None()))
269///         }
270///     )
271/// }
272/// ```
273pub fn future_into_py_with_locals<F, T>(py: Python, locals: TaskLocals, fut: F) -> PyResult<&PyAny>
274where
275    F: Future<Output = PyResult<T>> + Send + 'static,
276    T: IntoPy<PyObject>,
277{
278    generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
279}
280
281/// Convert a Rust Future into a Python awaitable
282///
283/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
284/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
285///
286/// Python `contextvars` are preserved when calling async Python functions within the Rust future
287/// via [`into_future`] (new behaviour in `v0.15`).
288///
289/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
290/// unfortunately fail to resolve them when called within the Rust future. This is because the
291/// function is being called from a Rust thread, not inside an actual Python coroutine context.
292/// >
293/// > As a workaround, you can get the `contextvars` from the current task locals using
294/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
295/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
296/// synchronous function, and restore the previous context when it returns or raises an exception.
297///
298/// # Arguments
299/// * `py` - The current PyO3 GIL guard
300/// * `fut` - The Rust future to be converted
301///
302/// # Examples
303///
304/// ```
305/// use std::time::Duration;
306///
307/// use pyo3::prelude::*;
308///
309/// /// Awaitable sleep function
310/// #[pyfunction]
311/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
312///     let secs = secs.extract()?;
313///     pyo3_asyncio::async_std::future_into_py(py, async move {
314///         async_std::task::sleep(Duration::from_secs(secs)).await;
315///         Ok(())
316///     })
317/// }
318/// ```
319pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
320where
321    F: Future<Output = PyResult<T>> + Send + 'static,
322    T: IntoPy<PyObject>,
323{
324    generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
325}
326
327/// Convert a `!Send` Rust Future into a Python awaitable
328///
329/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
330/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
331///
332/// Python `contextvars` are preserved when calling async Python functions within the Rust future
333/// via [`into_future`] (new behaviour in `v0.15`).
334///
335/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
336/// unfortunately fail to resolve them when called within the Rust future. This is because the
337/// function is being called from a Rust thread, not inside an actual Python coroutine context.
338/// >
339/// > As a workaround, you can get the `contextvars` from the current task locals using
340/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
341/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
342/// synchronous function, and restore the previous context when it returns or raises an exception.
343///
344/// # Arguments
345/// * `py` - PyO3 GIL guard
346/// * `locals` - The task locals for the given future
347/// * `fut` - The Rust future to be converted
348///
349/// # Examples
350///
351/// ```
352/// use std::{rc::Rc, time::Duration};
353///
354/// use pyo3::prelude::*;
355///
356/// /// Awaitable non-send sleep function
357/// #[pyfunction]
358/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
359///     // Rc is non-send so it cannot be passed into pyo3_asyncio::async_std::future_into_py
360///     let secs = Rc::new(secs);
361///     Ok(pyo3_asyncio::async_std::local_future_into_py_with_locals(
362///         py,
363///         pyo3_asyncio::async_std::get_current_locals(py)?,
364///         async move {
365///             async_std::task::sleep(Duration::from_secs(*secs)).await;
366///             Python::with_gil(|py| Ok(py.None()))
367///         }
368///     )?.into())
369/// }
370///
371/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
372/// #[pyo3_asyncio::async_std::main]
373/// async fn main() -> PyResult<()> {
374///     Python::with_gil(|py| {
375///         let py_future = sleep_for(py, 1)?;
376///         pyo3_asyncio::async_std::into_future(py_future)
377///     })?
378///     .await?;
379///
380///     Ok(())
381/// }
382/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
383/// # fn main() {}
384/// ```
385#[deprecated(
386    since = "0.18.0",
387    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
388)]
389#[allow(deprecated)]
390pub fn local_future_into_py_with_locals<F, T>(
391    py: Python,
392    locals: TaskLocals,
393    fut: F,
394) -> PyResult<&PyAny>
395where
396    F: Future<Output = PyResult<T>> + 'static,
397    T: IntoPy<PyObject>,
398{
399    generic::local_future_into_py_with_locals::<AsyncStdRuntime, _, T>(py, locals, fut)
400}
401
402/// Convert a `!Send` Rust Future into a Python awaitable
403///
404/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
405/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
406///
407/// Python `contextvars` are preserved when calling async Python functions within the Rust future
408/// via [`into_future`] (new behaviour in `v0.15`).
409///
410/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
411/// unfortunately fail to resolve them when called within the Rust future. This is because the
412/// function is being called from a Rust thread, not inside an actual Python coroutine context.
413/// >
414/// > As a workaround, you can get the `contextvars` from the current task locals using
415/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
416/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
417/// synchronous function, and restore the previous context when it returns or raises an exception.
418///
419/// # Arguments
420/// * `py` - The current PyO3 GIL guard
421/// * `fut` - The Rust future to be converted
422///
423/// # Examples
424///
425/// ```
426/// use std::{rc::Rc, time::Duration};
427///
428/// use pyo3::prelude::*;
429///
430/// /// Awaitable non-send sleep function
431/// #[pyfunction]
432/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
433///     // Rc is non-send so it cannot be passed into pyo3_asyncio::async_std::future_into_py
434///     let secs = Rc::new(secs);
435///     pyo3_asyncio::async_std::local_future_into_py(py, async move {
436///         async_std::task::sleep(Duration::from_secs(*secs)).await;
437///         Ok(())
438///     })
439/// }
440///
441/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
442/// #[pyo3_asyncio::async_std::main]
443/// async fn main() -> PyResult<()> {
444///     Python::with_gil(|py| {
445///         let py_future = sleep_for(py, 1)?;
446///         pyo3_asyncio::async_std::into_future(py_future)
447///     })?
448///     .await?;
449///
450///     Ok(())
451/// }
452/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
453/// # fn main() {}
454/// ```
455#[deprecated(
456    since = "0.18.0",
457    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
458)]
459#[allow(deprecated)]
460pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
461where
462    F: Future<Output = PyResult<T>> + 'static,
463    T: IntoPy<PyObject>,
464{
465    generic::local_future_into_py::<AsyncStdRuntime, _, T>(py, fut)
466}
467
468/// Convert a Python `awaitable` into a Rust Future
469///
470/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
471/// completion handler sends the result of this Task through a
472/// `futures::channel::oneshot::Sender<PyResult<PyObject>>` and the future returned by this function
473/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<PyObject>>`.
474///
475/// # Arguments
476/// * `awaitable` - The Python `awaitable` to be converted
477///
478/// # Examples
479///
480/// ```
481/// use std::time::Duration;
482///
483/// use pyo3::prelude::*;
484///
485/// const PYTHON_CODE: &'static str = r#"
486/// import asyncio
487///
488/// async def py_sleep(duration):
489///     await asyncio.sleep(duration)
490/// "#;
491///
492/// async fn py_sleep(seconds: f32) -> PyResult<()> {
493///     let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
494///         Ok(
495///             PyModule::from_code(
496///                 py,
497///                 PYTHON_CODE,
498///                 "test_into_future/test_mod.py",
499///                 "test_mod"
500///             )?
501///             .into()
502///         )
503///     })?;
504///
505///     Python::with_gil(|py| {
506///         pyo3_asyncio::async_std::into_future(
507///             test_mod
508///                 .call_method1(py, "py_sleep", (seconds.into_py(py),))?
509///                 .as_ref(py),
510///         )
511///     })?
512///     .await?;
513///     Ok(())
514/// }
515/// ```
516pub fn into_future(awaitable: &PyAny) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
517    generic::into_future::<AsyncStdRuntime>(awaitable)
518}
519
520/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
521///
522/// **This API is marked as unstable** and is only available when the
523/// `unstable-streams` crate feature is enabled. This comes with no
524/// stability guarantees, and could be changed or removed at any time.
525///
526/// # Arguments
527/// * `gen` - The Python async generator to be converted
528///
529/// # Examples
530/// ```
531/// use pyo3::prelude::*;
532/// use futures::{StreamExt, TryStreamExt};
533///
534/// const TEST_MOD: &str = r#"
535/// import asyncio
536///
537/// async def gen():
538///     for i in range(10):
539///         await asyncio.sleep(0.1)
540///         yield i
541/// "#;
542///
543/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
544/// # #[pyo3_asyncio::async_std::main]
545/// # async fn main() -> PyResult<()> {
546/// let stream = Python::with_gil(|py| {
547///     let test_mod = PyModule::from_code(
548///         py,
549///         TEST_MOD,
550///         "test_rust_coroutine/test_mod.py",
551///         "test_mod",
552///     )?;
553///
554///     pyo3_asyncio::async_std::into_stream_v1(test_mod.call_method0("gen")?)
555/// })?;
556///
557/// let vals = stream
558///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
559///     .try_collect::<Vec<i32>>()
560///     .await?;
561///
562/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
563///
564/// Ok(())
565/// # }
566/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
567/// # fn main() {}
568/// ```
569#[cfg(feature = "unstable-streams")]
570pub fn into_stream_v1<'p>(
571    gen: &'p PyAny,
572) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
573    generic::into_stream_v1::<AsyncStdRuntime>(gen)
574}
575
576/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
577///
578/// **This API is marked as unstable** and is only available when the
579/// `unstable-streams` crate feature is enabled. This comes with no
580/// stability guarantees, and could be changed or removed at any time.
581///
582/// # Arguments
583/// * `locals` - The current task locals
584/// * `gen` - The Python async generator to be converted
585///
586/// # Examples
587/// ```
588/// use pyo3::prelude::*;
589/// use futures::{StreamExt, TryStreamExt};
590///
591/// const TEST_MOD: &str = r#"
592/// import asyncio
593///
594/// async def gen():
595///     for i in range(10):
596///         await asyncio.sleep(0.1)
597///         yield i
598/// "#;
599///
600/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
601/// # #[pyo3_asyncio::async_std::main]
602/// # async fn main() -> PyResult<()> {
603/// let stream = Python::with_gil(|py| {
604///     let test_mod = PyModule::from_code(
605///         py,
606///         TEST_MOD,
607///         "test_rust_coroutine/test_mod.py",
608///         "test_mod",
609///     )?;
610///
611///     pyo3_asyncio::async_std::into_stream_with_locals_v1(
612///         pyo3_asyncio::async_std::get_current_locals(py)?,
613///         test_mod.call_method0("gen")?
614///     )
615/// })?;
616///
617/// let vals = stream
618///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
619///     .try_collect::<Vec<i32>>()
620///     .await?;
621///
622/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
623///
624/// Ok(())
625/// # }
626/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
627/// # fn main() {}
628/// ```
629#[cfg(feature = "unstable-streams")]
630pub fn into_stream_with_locals_v1<'p>(
631    locals: TaskLocals,
632    gen: &'p PyAny,
633) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
634    generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
635}
636
637/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
638///
639/// **This API is marked as unstable** and is only available when the
640/// `unstable-streams` crate feature is enabled. This comes with no
641/// stability guarantees, and could be changed or removed at any time.
642///
643/// # Arguments
644/// * `locals` - The current task locals
645/// * `gen` - The Python async generator to be converted
646///
647/// # Examples
648/// ```
649/// use pyo3::prelude::*;
650/// use futures::{StreamExt, TryStreamExt};
651///
652/// const TEST_MOD: &str = r#"
653/// import asyncio
654///
655/// async def gen():
656///     for i in range(10):
657///         await asyncio.sleep(0.1)
658///         yield i
659/// "#;
660///
661/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
662/// # #[pyo3_asyncio::async_std::main]
663/// # async fn main() -> PyResult<()> {
664/// let stream = Python::with_gil(|py| {
665///     let test_mod = PyModule::from_code(
666///         py,
667///         TEST_MOD,
668///         "test_rust_coroutine/test_mod.py",
669///         "test_mod",
670///     )?;
671///
672///     pyo3_asyncio::async_std::into_stream_with_locals_v2(
673///         pyo3_asyncio::async_std::get_current_locals(py)?,
674///         test_mod.call_method0("gen")?
675///     )
676/// })?;
677///
678/// let vals = stream
679///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
680///     .try_collect::<Vec<i32>>()
681///     .await?;
682///
683/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
684///
685/// Ok(())
686/// # }
687/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
688/// # fn main() {}
689/// ```
690#[cfg(feature = "unstable-streams")]
691pub fn into_stream_with_locals_v2<'p>(
692    locals: TaskLocals,
693    gen: &'p PyAny,
694) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
695    generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
696}
697
698/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
699///
700/// **This API is marked as unstable** and is only available when the
701/// `unstable-streams` crate feature is enabled. This comes with no
702/// stability guarantees, and could be changed or removed at any time.
703///
704/// # Arguments
705/// * `gen` - The Python async generator to be converted
706///
707/// # Examples
708/// ```
709/// use pyo3::prelude::*;
710/// use futures::{StreamExt, TryStreamExt};
711///
712/// const TEST_MOD: &str = r#"
713/// import asyncio
714///
715/// async def gen():
716///     for i in range(10):
717///         await asyncio.sleep(0.1)
718///         yield i
719/// "#;
720///
721/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
722/// # #[pyo3_asyncio::async_std::main]
723/// # async fn main() -> PyResult<()> {
724/// let stream = Python::with_gil(|py| {
725///     let test_mod = PyModule::from_code(
726///         py,
727///         TEST_MOD,
728///         "test_rust_coroutine/test_mod.py",
729///         "test_mod",
730///     )?;
731///
732///     pyo3_asyncio::async_std::into_stream_v2(test_mod.call_method0("gen")?)
733/// })?;
734///
735/// let vals = stream
736///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
737///     .try_collect::<Vec<i32>>()
738///     .await?;
739///
740/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
741///
742/// Ok(())
743/// # }
744/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
745/// # fn main() {}
746/// ```
747#[cfg(feature = "unstable-streams")]
748pub fn into_stream_v2<'p>(
749    gen: &'p PyAny,
750) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
751    generic::into_stream_v2::<AsyncStdRuntime>(gen)
752}