pyo3_async_runtimes/async_std.rs
1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>async-std-runtime</code></span> PyO3 Asyncio functions specific to the async-std runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//!
9//! are only available when the `unstable-streams` Cargo feature is enabled:
10//!
11//! ```toml
12//! [dependencies.pyo3-async-runtimes]
13//! version = "0.24"
14//! features = ["unstable-streams"]
15//! ```
16
17use async_std::task;
18use futures::FutureExt;
19use pyo3::prelude::*;
20use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin};
21
22use crate::{
23 generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt},
24 TaskLocals,
25};
26
27/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
28/// re-exports for macros
29#[cfg(feature = "attributes")]
30pub mod re_exports {
31 /// re-export spawn_blocking for use in `#[test]` macro without external dependency
32 pub use async_std::task::spawn_blocking;
33}
34
35/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> Provides the boilerplate for the `async-std` runtime and runs an async fn as main
36#[cfg(feature = "attributes")]
37pub use pyo3_async_runtimes_macros::async_std_main as main;
38
39/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
40/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
41/// Registers an `async-std` test with the `pyo3-asyncio` test harness
42#[cfg(all(feature = "attributes", feature = "testing"))]
43pub use pyo3_async_runtimes_macros::async_std_test as test;
44
45struct AsyncStdJoinErr(Box<dyn Any + Send + 'static>);
46
47impl JoinError for AsyncStdJoinErr {
48 fn is_panic(&self) -> bool {
49 true
50 }
51 fn into_panic(self) -> Box<dyn Any + Send + 'static> {
52 self.0
53 }
54}
55
56async_std::task_local! {
57 static TASK_LOCALS: RefCell<Option<TaskLocals>> = RefCell::new(None);
58}
59
60struct AsyncStdRuntime;
61
62impl Runtime for AsyncStdRuntime {
63 type JoinError = AsyncStdJoinErr;
64 type JoinHandle = task::JoinHandle<Result<(), AsyncStdJoinErr>>;
65
66 fn spawn<F>(fut: F) -> Self::JoinHandle
67 where
68 F: Future<Output = ()> + Send + 'static,
69 {
70 task::spawn(async move {
71 AssertUnwindSafe(fut)
72 .catch_unwind()
73 .await
74 .map_err(AsyncStdJoinErr)
75 })
76 }
77
78 fn spawn_blocking<F>(f: F) -> Self::JoinHandle
79 where
80 F: FnOnce() + Send + 'static,
81 {
82 task::spawn_blocking(move || {
83 panic::catch_unwind(AssertUnwindSafe(f)).map_err(|e| AsyncStdJoinErr(Box::new(e)))
84 })
85 }
86}
87
88impl ContextExt for AsyncStdRuntime {
89 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
90 where
91 F: Future<Output = R> + Send + 'static,
92 {
93 let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
94 Box::pin(async move {
95 let result = fut.await;
96 TASK_LOCALS.with(|c| c.replace(old));
97 result
98 })
99 }
100
101 fn get_task_locals() -> Option<TaskLocals> {
102 TASK_LOCALS
103 .try_with(|c| c.borrow().as_ref().map(|locals| locals.clone()))
104 .unwrap_or_default()
105 }
106}
107
108impl SpawnLocalExt for AsyncStdRuntime {
109 fn spawn_local<F>(fut: F) -> Self::JoinHandle
110 where
111 F: Future<Output = ()> + 'static,
112 {
113 task::spawn_local(async move {
114 fut.await;
115 Ok(())
116 })
117 }
118}
119
120impl LocalContextExt for AsyncStdRuntime {
121 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
122 where
123 F: Future<Output = R> + 'static,
124 {
125 let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
126 Box::pin(async move {
127 let result = fut.await;
128 TASK_LOCALS.with(|c| c.replace(old));
129 result
130 })
131 }
132}
133
134/// Set the task local event loop for the given future
135pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
136where
137 F: Future<Output = R> + Send + 'static,
138{
139 AsyncStdRuntime::scope(locals, fut).await
140}
141
142/// Set the task local event loop for the given !Send future
143pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
144where
145 F: Future<Output = R> + 'static,
146{
147 AsyncStdRuntime::scope_local(locals, fut).await
148}
149
150/// Get the current event loop from either Python or Rust async task local context
151///
152/// This function first checks if the runtime has a task-local reference to the Python event loop.
153/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
154/// associated with the current OS thread.
155pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
156 generic::get_current_loop::<AsyncStdRuntime>(py)
157}
158
159/// Either copy the task locals from the current task OR get the current running loop and
160/// contextvars from Python.
161pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
162 generic::get_current_locals::<AsyncStdRuntime>(py)
163}
164
165/// Run the event loop until the given Future completes
166///
167/// The event loop runs until the given future is complete.
168///
169/// After this function returns, the event loop can be resumed with [`run_until_complete`]
170///
171/// # Arguments
172/// * `event_loop` - The Python event loop that should run the future
173/// * `fut` - The future to drive to completion
174///
175/// # Examples
176///
177/// ```
178/// # use std::time::Duration;
179/// #
180/// # use pyo3::prelude::*;
181/// #
182/// # Python::initialize();
183/// #
184/// # Python::attach(|py| -> PyResult<()> {
185/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
186/// pyo3_async_runtimes::async_std::run_until_complete(event_loop, async move {
187/// async_std::task::sleep(Duration::from_secs(1)).await;
188/// Ok(())
189/// })?;
190/// # Ok(())
191/// # }).unwrap();
192/// ```
193pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
194where
195 F: Future<Output = PyResult<T>> + Send + 'static,
196 T: Send + Sync + 'static,
197{
198 generic::run_until_complete::<AsyncStdRuntime, _, T>(&event_loop, fut)
199}
200
201/// Run the event loop until the given Future completes
202///
203/// # Arguments
204/// * `py` - The current PyO3 GIL guard
205/// * `fut` - The future to drive to completion
206///
207/// # Examples
208///
209/// ```no_run
210/// # use std::time::Duration;
211/// #
212/// # use pyo3::prelude::*;
213/// #
214/// fn main() {
215/// // call this or use pyo3 0.14 "auto-initialize" feature
216/// Python::initialize();
217///
218/// Python::attach(|py| {
219/// pyo3_async_runtimes::async_std::run(py, async move {
220/// async_std::task::sleep(Duration::from_secs(1)).await;
221/// Ok(())
222/// })
223/// .map_err(|e| {
224/// e.print_and_set_sys_last_vars(py);
225/// })
226/// .unwrap();
227/// })
228/// }
229/// ```
230pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
231where
232 F: Future<Output = PyResult<T>> + Send + 'static,
233 T: Send + Sync + 'static,
234{
235 generic::run::<AsyncStdRuntime, F, T>(py, fut)
236}
237
238/// Convert a Rust Future into a Python awaitable
239///
240/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
241/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
242///
243/// Python `contextvars` are preserved when calling async Python functions within the Rust future
244/// via [`into_future`] (new behaviour in `v0.15`).
245///
246/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
247/// > unfortunately fail to resolve them when called within the Rust future. This is because the
248/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
249/// >
250/// > As a workaround, you can get the `contextvars` from the current task locals using
251/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
252/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
253/// > synchronous function, and restore the previous context when it returns or raises an exception.
254///
255/// # Arguments
256/// * `py` - PyO3 GIL guard
257/// * `locals` - The task locals for the given future
258/// * `fut` - The Rust future to be converted
259///
260/// # Examples
261///
262/// ```
263/// use std::time::Duration;
264///
265/// use pyo3::prelude::*;
266///
267/// /// Awaitable sleep function
268/// #[pyfunction]
269/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
270/// let secs = secs.extract()?;
271/// pyo3_async_runtimes::async_std::future_into_py_with_locals(
272/// py,
273/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
274/// async move {
275/// async_std::task::sleep(Duration::from_secs(secs)).await;
276/// Python::attach(|py| Ok(py.None()))
277/// }
278/// )
279/// }
280/// ```
281pub fn future_into_py_with_locals<F, T>(
282 py: Python,
283 locals: TaskLocals,
284 fut: F,
285) -> PyResult<Bound<PyAny>>
286where
287 F: Future<Output = PyResult<T>> + Send + 'static,
288 T: for<'py> IntoPyObject<'py> + Send + 'static,
289{
290 generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
291}
292
293/// Convert a Rust Future into a Python awaitable
294///
295/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
296/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
297///
298/// Python `contextvars` are preserved when calling async Python functions within the Rust future
299/// via [`into_future`] (new behaviour in `v0.15`).
300///
301/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
302/// > unfortunately fail to resolve them when called within the Rust future. This is because the
303/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
304/// >
305/// > As a workaround, you can get the `contextvars` from the current task locals using
306/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
307/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
308/// > synchronous function, and restore the previous context when it returns or raises an exception.
309///
310/// # Arguments
311/// * `py` - The current PyO3 GIL guard
312/// * `fut` - The Rust future to be converted
313///
314/// # Examples
315///
316/// ```
317/// use std::time::Duration;
318///
319/// use pyo3::prelude::*;
320///
321/// /// Awaitable sleep function
322/// #[pyfunction]
323/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
324/// let secs = secs.extract()?;
325/// pyo3_async_runtimes::async_std::future_into_py(py, async move {
326/// async_std::task::sleep(Duration::from_secs(secs)).await;
327/// Ok(())
328/// })
329/// }
330/// ```
331pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
332where
333 F: Future<Output = PyResult<T>> + Send + 'static,
334 T: for<'py> IntoPyObject<'py> + Send + 'static,
335{
336 generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
337}
338
339/// Convert a `!Send` Rust Future into a Python awaitable
340///
341/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
342/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
343///
344/// Python `contextvars` are preserved when calling async Python functions within the Rust future
345/// via [`into_future`] (new behaviour in `v0.15`).
346///
347/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
348/// > unfortunately fail to resolve them when called within the Rust future. This is because the
349/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
350/// >
351/// > As a workaround, you can get the `contextvars` from the current task locals using
352/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
353/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
354/// > synchronous function, and restore the previous context when it returns or raises an exception.
355///
356/// # Arguments
357/// * `py` - PyO3 GIL guard
358/// * `locals` - The task locals for the given future
359/// * `fut` - The Rust future to be converted
360///
361/// # Examples
362///
363/// ```
364/// use std::{rc::Rc, time::Duration};
365///
366/// use pyo3::prelude::*;
367///
368/// /// Awaitable non-send sleep function
369/// #[pyfunction]
370/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
371/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
372/// let secs = Rc::new(secs);
373/// Ok(pyo3_async_runtimes::async_std::local_future_into_py_with_locals(
374/// py,
375/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
376/// async move {
377/// async_std::task::sleep(Duration::from_secs(*secs)).await;
378/// Python::attach(|py| Ok(py.None()))
379/// }
380/// )?.into())
381/// }
382///
383/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
384/// #[pyo3_async_runtimes::async_std::main]
385/// async fn main() -> PyResult<()> {
386/// Python::attach(|py| {
387/// let py_future = sleep_for(py, 1)?;
388/// pyo3_async_runtimes::async_std::into_future(py_future)
389/// })?
390/// .await?;
391///
392/// Ok(())
393/// }
394/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
395/// # fn main() {}
396/// ```
397#[deprecated(
398 since = "0.18.0",
399 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
400)]
401#[allow(deprecated)]
402pub fn local_future_into_py_with_locals<F, T>(
403 py: Python,
404 locals: TaskLocals,
405 fut: F,
406) -> PyResult<Bound<PyAny>>
407where
408 F: Future<Output = PyResult<T>> + 'static,
409 T: for<'py> IntoPyObject<'py>,
410{
411 generic::local_future_into_py_with_locals::<AsyncStdRuntime, _, T>(py, locals, fut)
412}
413
414/// Convert a `!Send` Rust Future into a Python awaitable
415///
416/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
417/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
418///
419/// Python `contextvars` are preserved when calling async Python functions within the Rust future
420/// via [`into_future`] (new behaviour in `v0.15`).
421///
422/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
423/// > unfortunately fail to resolve them when called within the Rust future. This is because the
424/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
425/// >
426/// > As a workaround, you can get the `contextvars` from the current task locals using
427/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
428/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
429/// > synchronous function, and restore the previous context when it returns or raises an exception.
430///
431/// # Arguments
432/// * `py` - The current PyO3 GIL guard
433/// * `fut` - The Rust future to be converted
434///
435/// # Examples
436///
437/// ```
438/// use std::{rc::Rc, time::Duration};
439///
440/// use pyo3::prelude::*;
441///
442/// /// Awaitable non-send sleep function
443/// #[pyfunction]
444/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
445/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
446/// let secs = Rc::new(secs);
447/// pyo3_async_runtimes::async_std::local_future_into_py(py, async move {
448/// async_std::task::sleep(Duration::from_secs(*secs)).await;
449/// Ok(())
450/// })
451/// }
452///
453/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
454/// #[pyo3_async_runtimes::async_std::main]
455/// async fn main() -> PyResult<()> {
456/// Python::attach(|py| {
457/// let py_future = sleep_for(py, 1)?;
458/// pyo3_async_runtimes::async_std::into_future(py_future)
459/// })?
460/// .await?;
461///
462/// Ok(())
463/// }
464/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
465/// # fn main() {}
466/// ```
467#[deprecated(
468 since = "0.18.0",
469 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
470)]
471#[allow(deprecated)]
472pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
473where
474 F: Future<Output = PyResult<T>> + 'static,
475 T: for<'py> IntoPyObject<'py>,
476{
477 generic::local_future_into_py::<AsyncStdRuntime, _, T>(py, fut)
478}
479
480/// Convert a Python `awaitable` into a Rust Future
481///
482/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
483/// completion handler sends the result of this Task through a
484/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
485/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
486///
487/// # Arguments
488/// * `awaitable` - The Python `awaitable` to be converted
489///
490/// # Examples
491///
492/// ```
493/// use std::time::Duration;
494/// use std::ffi::CString;
495///
496/// use pyo3::prelude::*;
497///
498/// const PYTHON_CODE: &'static str = r#"
499/// import asyncio
500///
501/// async def py_sleep(duration):
502/// await asyncio.sleep(duration)
503/// "#;
504///
505/// async fn py_sleep(seconds: f32) -> PyResult<()> {
506/// let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
507/// Ok(
508/// PyModule::from_code(
509/// py,
510/// &CString::new(PYTHON_CODE).unwrap(),
511/// &CString::new("test_into_future/test_mod.py").unwrap(),
512/// &CString::new("test_mod").unwrap()
513/// )?
514/// .into()
515/// )
516/// })?;
517///
518/// Python::attach(|py| {
519/// pyo3_async_runtimes::async_std::into_future(
520/// test_mod
521/// .call_method1(py, "py_sleep", (seconds,))?
522/// .into_bound(py),
523/// )
524/// })?
525/// .await?;
526/// Ok(())
527/// }
528/// ```
529pub fn into_future(
530 awaitable: Bound<PyAny>,
531) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
532 generic::into_future::<AsyncStdRuntime>(awaitable)
533}
534
535/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
536///
537/// **This API is marked as unstable** and is only available when the
538/// `unstable-streams` crate feature is enabled. This comes with no
539/// stability guarantees, and could be changed or removed at any time.
540///
541/// # Arguments
542/// * `gen` - The Python async generator to be converted
543///
544/// # Examples
545/// ```
546/// use pyo3::prelude::*;
547/// use futures::{StreamExt, TryStreamExt};
548/// use std::ffi::CString;
549///
550/// const TEST_MOD: &str = r#"
551/// import asyncio
552///
553/// async def gen():
554/// for i in range(10):
555/// await asyncio.sleep(0.1)
556/// yield i
557/// "#;
558///
559/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
560/// # #[pyo3_async_runtimes::async_std::main]
561/// # async fn main() -> PyResult<()> {
562/// let stream = Python::attach(|py| {
563/// let test_mod = PyModule::from_code(
564/// py,
565/// &CString::new(TEST_MOD).unwrap(),
566/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
567/// &CString::new("test_mod").unwrap(),
568/// )?;
569///
570/// pyo3_async_runtimes::async_std::into_stream_v1(test_mod.call_method0("gen")?)
571/// })?;
572///
573/// let vals = stream
574/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
575/// .try_collect::<Vec<i32>>()
576/// .await?;
577///
578/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
579///
580/// Ok(())
581/// # }
582/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
583/// # fn main() {}
584/// ```
585#[cfg(feature = "unstable-streams")]
586pub fn into_stream_v1(
587 gen: Bound<'_, PyAny>,
588) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
589 generic::into_stream_v1::<AsyncStdRuntime>(gen)
590}
591
592/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
593///
594/// **This API is marked as unstable** and is only available when the
595/// `unstable-streams` crate feature is enabled. This comes with no
596/// stability guarantees, and could be changed or removed at any time.
597///
598/// # Arguments
599/// * `locals` - The current task locals
600/// * `gen` - The Python async generator to be converted
601///
602/// # Examples
603/// ```
604/// use pyo3::prelude::*;
605/// use futures::{StreamExt, TryStreamExt};
606/// use std::ffi::CString;
607///
608/// const TEST_MOD: &str = r#"
609/// import asyncio
610///
611/// async def gen():
612/// for i in range(10):
613/// await asyncio.sleep(0.1)
614/// yield i
615/// "#;
616///
617/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
618/// # #[pyo3_async_runtimes::async_std::main]
619/// # async fn main() -> PyResult<()> {
620/// let stream = Python::attach(|py| {
621/// let test_mod = PyModule::from_code(
622/// py,
623/// &CString::new(TEST_MOD).unwrap(),
624/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
625/// &CString::new("test_mod").unwrap(),
626/// )?;
627///
628/// pyo3_async_runtimes::async_std::into_stream_with_locals_v1(
629/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
630/// test_mod.call_method0("gen")?
631/// )
632/// })?;
633///
634/// let vals = stream
635/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
636/// .try_collect::<Vec<i32>>()
637/// .await?;
638///
639/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
640///
641/// Ok(())
642/// # }
643/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
644/// # fn main() {}
645/// ```
646#[cfg(feature = "unstable-streams")]
647pub fn into_stream_with_locals_v1(
648 locals: TaskLocals,
649 gen: Bound<'_, PyAny>,
650) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
651 generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
652}
653
654/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
655///
656/// **This API is marked as unstable** and is only available when the
657/// `unstable-streams` crate feature is enabled. This comes with no
658/// stability guarantees, and could be changed or removed at any time.
659///
660/// # Arguments
661/// * `locals` - The current task locals
662/// * `gen` - The Python async generator to be converted
663///
664/// # Examples
665/// ```
666/// use pyo3::prelude::*;
667/// use futures::{StreamExt, TryStreamExt};
668/// use std::ffi::CString;
669///
670/// const TEST_MOD: &str = r#"
671/// import asyncio
672///
673/// async def gen():
674/// for i in range(10):
675/// await asyncio.sleep(0.1)
676/// yield i
677/// "#;
678///
679/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
680/// # #[pyo3_async_runtimes::async_std::main]
681/// # async fn main() -> PyResult<()> {
682/// let stream = Python::attach(|py| {
683/// let test_mod = PyModule::from_code(
684/// py,
685/// &CString::new(TEST_MOD).unwrap(),
686/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
687/// &CString::new("test_mod").unwrap(),
688/// )?;
689///
690/// pyo3_async_runtimes::async_std::into_stream_with_locals_v2(
691/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
692/// test_mod.call_method0("gen")?
693/// )
694/// })?;
695///
696/// let vals = stream
697/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
698/// .try_collect::<Vec<i32>>()
699/// .await?;
700///
701/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
702///
703/// Ok(())
704/// # }
705/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
706/// # fn main() {}
707/// ```
708#[cfg(feature = "unstable-streams")]
709pub fn into_stream_with_locals_v2(
710 locals: TaskLocals,
711 gen: Bound<'_, PyAny>,
712) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
713 generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
714}
715
716/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
717///
718/// **This API is marked as unstable** and is only available when the
719/// `unstable-streams` crate feature is enabled. This comes with no
720/// stability guarantees, and could be changed or removed at any time.
721///
722/// # Arguments
723/// * `gen` - The Python async generator to be converted
724///
725/// # Examples
726/// ```
727/// use pyo3::prelude::*;
728/// use futures::{StreamExt, TryStreamExt};
729/// use std::ffi::CString;
730///
731/// const TEST_MOD: &str = r#"
732/// import asyncio
733///
734/// async def gen():
735/// for i in range(10):
736/// await asyncio.sleep(0.1)
737/// yield i
738/// "#;
739///
740/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
741/// # #[pyo3_async_runtimes::async_std::main]
742/// # async fn main() -> PyResult<()> {
743/// let stream = Python::attach(|py| {
744/// let test_mod = PyModule::from_code(
745/// py,
746/// &CString::new(TEST_MOD).unwrap(),
747/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
748/// &CString::new("test_mod").unwrap(),
749/// )?;
750///
751/// pyo3_async_runtimes::async_std::into_stream_v2(test_mod.call_method0("gen")?)
752/// })?;
753///
754/// let vals = stream
755/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
756/// .try_collect::<Vec<i32>>()
757/// .await?;
758///
759/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
760///
761/// Ok(())
762/// # }
763/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
764/// # fn main() {}
765/// ```
766#[cfg(feature = "unstable-streams")]
767pub fn into_stream_v2(
768 gen: Bound<'_, PyAny>,
769) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
770 generic::into_stream_v2::<AsyncStdRuntime>(gen)
771}