pyo3_async_runtimes/
async_std.rs

1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>async-std-runtime</code></span> PyO3 Asyncio functions specific to the async-std runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//!
9//! are only available when the `unstable-streams` Cargo feature is enabled:
10//!
11//! ```toml
12//! [dependencies.pyo3-async-runtimes]
13//! version = "0.24"
14//! features = ["unstable-streams"]
15//! ```
16
17use async_std::task;
18use futures::FutureExt;
19use pyo3::prelude::*;
20use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin};
21
22use crate::{
23    generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt},
24    TaskLocals,
25};
26
27/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
28/// re-exports for macros
29#[cfg(feature = "attributes")]
30pub mod re_exports {
31    /// re-export spawn_blocking for use in `#[test]` macro without external dependency
32    pub use async_std::task::spawn_blocking;
33}
34
35/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> Provides the boilerplate for the `async-std` runtime and runs an async fn as main
36#[cfg(feature = "attributes")]
37pub use pyo3_async_runtimes_macros::async_std_main as main;
38
39/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
40/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
41/// Registers an `async-std` test with the `pyo3-asyncio` test harness
42#[cfg(all(feature = "attributes", feature = "testing"))]
43pub use pyo3_async_runtimes_macros::async_std_test as test;
44
45struct AsyncStdJoinErr(Box<dyn Any + Send + 'static>);
46
47impl JoinError for AsyncStdJoinErr {
48    fn is_panic(&self) -> bool {
49        true
50    }
51    fn into_panic(self) -> Box<dyn Any + Send + 'static> {
52        self.0
53    }
54}
55
56async_std::task_local! {
57    static TASK_LOCALS: RefCell<Option<TaskLocals>> = RefCell::new(None);
58}
59
60struct AsyncStdRuntime;
61
62impl Runtime for AsyncStdRuntime {
63    type JoinError = AsyncStdJoinErr;
64    type JoinHandle = task::JoinHandle<Result<(), AsyncStdJoinErr>>;
65
66    fn spawn<F>(fut: F) -> Self::JoinHandle
67    where
68        F: Future<Output = ()> + Send + 'static,
69    {
70        task::spawn(async move {
71            AssertUnwindSafe(fut)
72                .catch_unwind()
73                .await
74                .map_err(AsyncStdJoinErr)
75        })
76    }
77}
78
79impl ContextExt for AsyncStdRuntime {
80    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
81    where
82        F: Future<Output = R> + Send + 'static,
83    {
84        let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
85        Box::pin(async move {
86            let result = fut.await;
87            TASK_LOCALS.with(|c| c.replace(old));
88            result
89        })
90    }
91
92    fn get_task_locals() -> Option<TaskLocals> {
93        TASK_LOCALS
94            .try_with(|c| {
95                c.borrow()
96                    .as_ref()
97                    .map(|locals| Python::with_gil(|py| locals.clone_ref(py)))
98            })
99            .unwrap_or_default()
100    }
101}
102
103impl SpawnLocalExt for AsyncStdRuntime {
104    fn spawn_local<F>(fut: F) -> Self::JoinHandle
105    where
106        F: Future<Output = ()> + 'static,
107    {
108        task::spawn_local(async move {
109            fut.await;
110            Ok(())
111        })
112    }
113}
114
115impl LocalContextExt for AsyncStdRuntime {
116    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
117    where
118        F: Future<Output = R> + 'static,
119    {
120        let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
121        Box::pin(async move {
122            let result = fut.await;
123            TASK_LOCALS.with(|c| c.replace(old));
124            result
125        })
126    }
127}
128
129/// Set the task local event loop for the given future
130pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
131where
132    F: Future<Output = R> + Send + 'static,
133{
134    AsyncStdRuntime::scope(locals, fut).await
135}
136
137/// Set the task local event loop for the given !Send future
138pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
139where
140    F: Future<Output = R> + 'static,
141{
142    AsyncStdRuntime::scope_local(locals, fut).await
143}
144
145/// Get the current event loop from either Python or Rust async task local context
146///
147/// This function first checks if the runtime has a task-local reference to the Python event loop.
148/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
149/// associated with the current OS thread.
150pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
151    generic::get_current_loop::<AsyncStdRuntime>(py)
152}
153
154/// Either copy the task locals from the current task OR get the current running loop and
155/// contextvars from Python.
156pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
157    generic::get_current_locals::<AsyncStdRuntime>(py)
158}
159
160/// Run the event loop until the given Future completes
161///
162/// The event loop runs until the given future is complete.
163///
164/// After this function returns, the event loop can be resumed with [`run_until_complete`]
165///
166/// # Arguments
167/// * `event_loop` - The Python event loop that should run the future
168/// * `fut` - The future to drive to completion
169///
170/// # Examples
171///
172/// ```
173/// # use std::time::Duration;
174/// #
175/// # use pyo3::prelude::*;
176/// #
177/// # pyo3::prepare_freethreaded_python();
178/// #
179/// # Python::with_gil(|py| -> PyResult<()> {
180/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
181/// pyo3_async_runtimes::async_std::run_until_complete(event_loop, async move {
182///     async_std::task::sleep(Duration::from_secs(1)).await;
183///     Ok(())
184/// })?;
185/// # Ok(())
186/// # }).unwrap();
187/// ```
188pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
189where
190    F: Future<Output = PyResult<T>> + Send + 'static,
191    T: Send + Sync + 'static,
192{
193    generic::run_until_complete::<AsyncStdRuntime, _, T>(&event_loop, fut)
194}
195
196/// Run the event loop until the given Future completes
197///
198/// # Arguments
199/// * `py` - The current PyO3 GIL guard
200/// * `fut` - The future to drive to completion
201///
202/// # Examples
203///
204/// ```no_run
205/// # use std::time::Duration;
206/// #
207/// # use pyo3::prelude::*;
208/// #
209/// fn main() {
210///     // call this or use pyo3 0.14 "auto-initialize" feature
211///     pyo3::prepare_freethreaded_python();
212///
213///     Python::with_gil(|py| {
214///         pyo3_async_runtimes::async_std::run(py, async move {
215///             async_std::task::sleep(Duration::from_secs(1)).await;
216///             Ok(())
217///         })
218///         .map_err(|e| {
219///             e.print_and_set_sys_last_vars(py);
220///         })
221///         .unwrap();
222///     })
223/// }
224/// ```
225pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
226where
227    F: Future<Output = PyResult<T>> + Send + 'static,
228    T: Send + Sync + 'static,
229{
230    generic::run::<AsyncStdRuntime, F, T>(py, fut)
231}
232
233/// Convert a Rust Future into a Python awaitable
234///
235/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
236/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
237///
238/// Python `contextvars` are preserved when calling async Python functions within the Rust future
239/// via [`into_future`] (new behaviour in `v0.15`).
240///
241/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
242/// > unfortunately fail to resolve them when called within the Rust future. This is because the
243/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
244/// >
245/// > As a workaround, you can get the `contextvars` from the current task locals using
246/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
247/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
248/// > synchronous function, and restore the previous context when it returns or raises an exception.
249///
250/// # Arguments
251/// * `py` - PyO3 GIL guard
252/// * `locals` - The task locals for the given future
253/// * `fut` - The Rust future to be converted
254///
255/// # Examples
256///
257/// ```
258/// use std::time::Duration;
259///
260/// use pyo3::prelude::*;
261///
262/// /// Awaitable sleep function
263/// #[pyfunction]
264/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
265///     let secs = secs.extract()?;
266///     pyo3_async_runtimes::async_std::future_into_py_with_locals(
267///         py,
268///         pyo3_async_runtimes::async_std::get_current_locals(py)?,
269///         async move {
270///             async_std::task::sleep(Duration::from_secs(secs)).await;
271///             Python::with_gil(|py| Ok(py.None()))
272///         }
273///     )
274/// }
275/// ```
276pub fn future_into_py_with_locals<F, T>(
277    py: Python,
278    locals: TaskLocals,
279    fut: F,
280) -> PyResult<Bound<PyAny>>
281where
282    F: Future<Output = PyResult<T>> + Send + 'static,
283    T: for<'py> IntoPyObject<'py>,
284{
285    generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
286}
287
288/// Convert a Rust Future into a Python awaitable
289///
290/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
291/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
292///
293/// Python `contextvars` are preserved when calling async Python functions within the Rust future
294/// via [`into_future`] (new behaviour in `v0.15`).
295///
296/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
297/// > unfortunately fail to resolve them when called within the Rust future. This is because the
298/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
299/// >
300/// > As a workaround, you can get the `contextvars` from the current task locals using
301/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
302/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
303/// > synchronous function, and restore the previous context when it returns or raises an exception.
304///
305/// # Arguments
306/// * `py` - The current PyO3 GIL guard
307/// * `fut` - The Rust future to be converted
308///
309/// # Examples
310///
311/// ```
312/// use std::time::Duration;
313///
314/// use pyo3::prelude::*;
315///
316/// /// Awaitable sleep function
317/// #[pyfunction]
318/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
319///     let secs = secs.extract()?;
320///     pyo3_async_runtimes::async_std::future_into_py(py, async move {
321///         async_std::task::sleep(Duration::from_secs(secs)).await;
322///         Ok(())
323///     })
324/// }
325/// ```
326pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
327where
328    F: Future<Output = PyResult<T>> + Send + 'static,
329    T: for<'py> IntoPyObject<'py>,
330{
331    generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
332}
333
334/// Convert a `!Send` Rust Future into a Python awaitable
335///
336/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
337/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
338///
339/// Python `contextvars` are preserved when calling async Python functions within the Rust future
340/// via [`into_future`] (new behaviour in `v0.15`).
341///
342/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
343/// > unfortunately fail to resolve them when called within the Rust future. This is because the
344/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
345/// >
346/// > As a workaround, you can get the `contextvars` from the current task locals using
347/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
348/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
349/// > synchronous function, and restore the previous context when it returns or raises an exception.
350///
351/// # Arguments
352/// * `py` - PyO3 GIL guard
353/// * `locals` - The task locals for the given future
354/// * `fut` - The Rust future to be converted
355///
356/// # Examples
357///
358/// ```
359/// use std::{rc::Rc, time::Duration};
360///
361/// use pyo3::prelude::*;
362///
363/// /// Awaitable non-send sleep function
364/// #[pyfunction]
365/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
366///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
367///     let secs = Rc::new(secs);
368///     Ok(pyo3_async_runtimes::async_std::local_future_into_py_with_locals(
369///         py,
370///         pyo3_async_runtimes::async_std::get_current_locals(py)?,
371///         async move {
372///             async_std::task::sleep(Duration::from_secs(*secs)).await;
373///             Python::with_gil(|py| Ok(py.None()))
374///         }
375///     )?.into())
376/// }
377///
378/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
379/// #[pyo3_async_runtimes::async_std::main]
380/// async fn main() -> PyResult<()> {
381///     Python::with_gil(|py| {
382///         let py_future = sleep_for(py, 1)?;
383///         pyo3_async_runtimes::async_std::into_future(py_future)
384///     })?
385///     .await?;
386///
387///     Ok(())
388/// }
389/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
390/// # fn main() {}
391/// ```
392#[deprecated(
393    since = "0.18.0",
394    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
395)]
396#[allow(deprecated)]
397pub fn local_future_into_py_with_locals<F, T>(
398    py: Python,
399    locals: TaskLocals,
400    fut: F,
401) -> PyResult<Bound<PyAny>>
402where
403    F: Future<Output = PyResult<T>> + 'static,
404    T: for<'py> IntoPyObject<'py>,
405{
406    generic::local_future_into_py_with_locals::<AsyncStdRuntime, _, T>(py, locals, fut)
407}
408
409/// Convert a `!Send` Rust Future into a Python awaitable
410///
411/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
412/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
413///
414/// Python `contextvars` are preserved when calling async Python functions within the Rust future
415/// via [`into_future`] (new behaviour in `v0.15`).
416///
417/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
418/// > unfortunately fail to resolve them when called within the Rust future. This is because the
419/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
420/// >
421/// > As a workaround, you can get the `contextvars` from the current task locals using
422/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
423/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
424/// > synchronous function, and restore the previous context when it returns or raises an exception.
425///
426/// # Arguments
427/// * `py` - The current PyO3 GIL guard
428/// * `fut` - The Rust future to be converted
429///
430/// # Examples
431///
432/// ```
433/// use std::{rc::Rc, time::Duration};
434///
435/// use pyo3::prelude::*;
436///
437/// /// Awaitable non-send sleep function
438/// #[pyfunction]
439/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
440///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
441///     let secs = Rc::new(secs);
442///     pyo3_async_runtimes::async_std::local_future_into_py(py, async move {
443///         async_std::task::sleep(Duration::from_secs(*secs)).await;
444///         Ok(())
445///     })
446/// }
447///
448/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
449/// #[pyo3_async_runtimes::async_std::main]
450/// async fn main() -> PyResult<()> {
451///     Python::with_gil(|py| {
452///         let py_future = sleep_for(py, 1)?;
453///         pyo3_async_runtimes::async_std::into_future(py_future)
454///     })?
455///     .await?;
456///
457///     Ok(())
458/// }
459/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
460/// # fn main() {}
461/// ```
462#[deprecated(
463    since = "0.18.0",
464    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
465)]
466#[allow(deprecated)]
467pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
468where
469    F: Future<Output = PyResult<T>> + 'static,
470    T: for<'py> IntoPyObject<'py>,
471{
472    generic::local_future_into_py::<AsyncStdRuntime, _, T>(py, fut)
473}
474
475/// Convert a Python `awaitable` into a Rust Future
476///
477/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
478/// completion handler sends the result of this Task through a
479/// `futures::channel::oneshot::Sender<PyResult<PyObject>>` and the future returned by this function
480/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<PyObject>>`.
481///
482/// # Arguments
483/// * `awaitable` - The Python `awaitable` to be converted
484///
485/// # Examples
486///
487/// ```
488/// use std::time::Duration;
489/// use std::ffi::CString;
490///
491/// use pyo3::prelude::*;
492///
493/// const PYTHON_CODE: &'static str = r#"
494/// import asyncio
495///
496/// async def py_sleep(duration):
497///     await asyncio.sleep(duration)
498/// "#;
499///
500/// async fn py_sleep(seconds: f32) -> PyResult<()> {
501///     let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
502///         Ok(
503///             PyModule::from_code(
504///                 py,
505///                 &CString::new(PYTHON_CODE).unwrap(),
506///                 &CString::new("test_into_future/test_mod.py").unwrap(),
507///                 &CString::new("test_mod").unwrap()
508///             )?
509///             .into()
510///         )
511///     })?;
512///
513///     Python::with_gil(|py| {
514///         pyo3_async_runtimes::async_std::into_future(
515///             test_mod
516///                 .call_method1(py, "py_sleep", (seconds.into_pyobject(py).unwrap(),))?
517///                 .into_bound(py),
518///         )
519///     })?
520///     .await?;
521///     Ok(())
522/// }
523/// ```
524pub fn into_future(
525    awaitable: Bound<PyAny>,
526) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
527    generic::into_future::<AsyncStdRuntime>(awaitable)
528}
529
530/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
531///
532/// **This API is marked as unstable** and is only available when the
533/// `unstable-streams` crate feature is enabled. This comes with no
534/// stability guarantees, and could be changed or removed at any time.
535///
536/// # Arguments
537/// * `gen` - The Python async generator to be converted
538///
539/// # Examples
540/// ```
541/// use pyo3::prelude::*;
542/// use futures::{StreamExt, TryStreamExt};
543/// use std::ffi::CString;
544///
545/// const TEST_MOD: &str = r#"
546/// import asyncio
547///
548/// async def gen():
549///     for i in range(10):
550///         await asyncio.sleep(0.1)
551///         yield i
552/// "#;
553///
554/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
555/// # #[pyo3_async_runtimes::async_std::main]
556/// # async fn main() -> PyResult<()> {
557/// let stream = Python::with_gil(|py| {
558///     let test_mod = PyModule::from_code(
559///         py,
560///         &CString::new(TEST_MOD).unwrap(),
561///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
562///         &CString::new("test_mod").unwrap(),
563///     )?;
564///
565///     pyo3_async_runtimes::async_std::into_stream_v1(test_mod.call_method0("gen")?)
566/// })?;
567///
568/// let vals = stream
569///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
570///     .try_collect::<Vec<i32>>()
571///     .await?;
572///
573/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
574///
575/// Ok(())
576/// # }
577/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
578/// # fn main() {}
579/// ```
580#[cfg(feature = "unstable-streams")]
581pub fn into_stream_v1(
582    gen: Bound<'_, PyAny>,
583) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
584    generic::into_stream_v1::<AsyncStdRuntime>(gen)
585}
586
587/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
588///
589/// **This API is marked as unstable** and is only available when the
590/// `unstable-streams` crate feature is enabled. This comes with no
591/// stability guarantees, and could be changed or removed at any time.
592///
593/// # Arguments
594/// * `locals` - The current task locals
595/// * `gen` - The Python async generator to be converted
596///
597/// # Examples
598/// ```
599/// use pyo3::prelude::*;
600/// use futures::{StreamExt, TryStreamExt};
601/// use std::ffi::CString;
602///
603/// const TEST_MOD: &str = r#"
604/// import asyncio
605///
606/// async def gen():
607///     for i in range(10):
608///         await asyncio.sleep(0.1)
609///         yield i
610/// "#;
611///
612/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
613/// # #[pyo3_async_runtimes::async_std::main]
614/// # async fn main() -> PyResult<()> {
615/// let stream = Python::with_gil(|py| {
616///     let test_mod = PyModule::from_code(
617///         py,
618///         &CString::new(TEST_MOD).unwrap(),
619///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
620///         &CString::new("test_mod").unwrap(),
621///     )?;
622///
623///     pyo3_async_runtimes::async_std::into_stream_with_locals_v1(
624///         pyo3_async_runtimes::async_std::get_current_locals(py)?,
625///         test_mod.call_method0("gen")?
626///     )
627/// })?;
628///
629/// let vals = stream
630///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
631///     .try_collect::<Vec<i32>>()
632///     .await?;
633///
634/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
635///
636/// Ok(())
637/// # }
638/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
639/// # fn main() {}
640/// ```
641#[cfg(feature = "unstable-streams")]
642pub fn into_stream_with_locals_v1(
643    locals: TaskLocals,
644    gen: Bound<'_, PyAny>,
645) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
646    generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
647}
648
649/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
650///
651/// **This API is marked as unstable** and is only available when the
652/// `unstable-streams` crate feature is enabled. This comes with no
653/// stability guarantees, and could be changed or removed at any time.
654///
655/// # Arguments
656/// * `locals` - The current task locals
657/// * `gen` - The Python async generator to be converted
658///
659/// # Examples
660/// ```
661/// use pyo3::prelude::*;
662/// use futures::{StreamExt, TryStreamExt};
663/// use std::ffi::CString;
664///
665/// const TEST_MOD: &str = r#"
666/// import asyncio
667///
668/// async def gen():
669///     for i in range(10):
670///         await asyncio.sleep(0.1)
671///         yield i
672/// "#;
673///
674/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
675/// # #[pyo3_async_runtimes::async_std::main]
676/// # async fn main() -> PyResult<()> {
677/// let stream = Python::with_gil(|py| {
678///     let test_mod = PyModule::from_code(
679///         py,
680///         &CString::new(TEST_MOD).unwrap(),
681///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
682///         &CString::new("test_mod").unwrap(),
683///     )?;
684///
685///     pyo3_async_runtimes::async_std::into_stream_with_locals_v2(
686///         pyo3_async_runtimes::async_std::get_current_locals(py)?,
687///         test_mod.call_method0("gen")?
688///     )
689/// })?;
690///
691/// let vals = stream
692///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
693///     .try_collect::<Vec<i32>>()
694///     .await?;
695///
696/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
697///
698/// Ok(())
699/// # }
700/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
701/// # fn main() {}
702/// ```
703#[cfg(feature = "unstable-streams")]
704pub fn into_stream_with_locals_v2(
705    locals: TaskLocals,
706    gen: Bound<'_, PyAny>,
707) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
708    generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
709}
710
711/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
712///
713/// **This API is marked as unstable** and is only available when the
714/// `unstable-streams` crate feature is enabled. This comes with no
715/// stability guarantees, and could be changed or removed at any time.
716///
717/// # Arguments
718/// * `gen` - The Python async generator to be converted
719///
720/// # Examples
721/// ```
722/// use pyo3::prelude::*;
723/// use futures::{StreamExt, TryStreamExt};
724/// use std::ffi::CString;
725///
726/// const TEST_MOD: &str = r#"
727/// import asyncio
728///
729/// async def gen():
730///     for i in range(10):
731///         await asyncio.sleep(0.1)
732///         yield i
733/// "#;
734///
735/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
736/// # #[pyo3_async_runtimes::async_std::main]
737/// # async fn main() -> PyResult<()> {
738/// let stream = Python::with_gil(|py| {
739///     let test_mod = PyModule::from_code(
740///         py,
741///         &CString::new(TEST_MOD).unwrap(),
742///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
743///         &CString::new("test_mod").unwrap(),
744///     )?;
745///
746///     pyo3_async_runtimes::async_std::into_stream_v2(test_mod.call_method0("gen")?)
747/// })?;
748///
749/// let vals = stream
750///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
751///     .try_collect::<Vec<i32>>()
752///     .await?;
753///
754/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
755///
756/// Ok(())
757/// # }
758/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
759/// # fn main() {}
760/// ```
761#[cfg(feature = "unstable-streams")]
762pub fn into_stream_v2(
763    gen: Bound<'_, PyAny>,
764) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
765    generic::into_stream_v2::<AsyncStdRuntime>(gen)
766}