pyo3_async_runtimes/async_std.rs
1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>async-std-runtime</code></span> PyO3 Asyncio functions specific to the async-std runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//!
9//! are only available when the `unstable-streams` Cargo feature is enabled:
10//!
11//! ```toml
12//! [dependencies.pyo3-async-runtimes]
13//! version = "0.24"
14//! features = ["unstable-streams"]
15//! ```
16
17use async_std::task;
18use futures::FutureExt;
19use pyo3::prelude::*;
20use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin};
21
22use crate::{
23 generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt},
24 TaskLocals,
25};
26
27/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
28/// re-exports for macros
29#[cfg(feature = "attributes")]
30pub mod re_exports {
31 /// re-export spawn_blocking for use in `#[test]` macro without external dependency
32 pub use async_std::task::spawn_blocking;
33}
34
35/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> Provides the boilerplate for the `async-std` runtime and runs an async fn as main
36#[cfg(feature = "attributes")]
37pub use pyo3_async_runtimes_macros::async_std_main as main;
38
39/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
40/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
41/// Registers an `async-std` test with the `pyo3-asyncio` test harness
42#[cfg(all(feature = "attributes", feature = "testing"))]
43pub use pyo3_async_runtimes_macros::async_std_test as test;
44
45struct AsyncStdJoinErr(Box<dyn Any + Send + 'static>);
46
47impl JoinError for AsyncStdJoinErr {
48 fn is_panic(&self) -> bool {
49 true
50 }
51 fn into_panic(self) -> Box<dyn Any + Send + 'static> {
52 self.0
53 }
54}
55
56async_std::task_local! {
57 static TASK_LOCALS: RefCell<Option<TaskLocals>> = RefCell::new(None);
58}
59
60struct AsyncStdRuntime;
61
62impl Runtime for AsyncStdRuntime {
63 type JoinError = AsyncStdJoinErr;
64 type JoinHandle = task::JoinHandle<Result<(), AsyncStdJoinErr>>;
65
66 fn spawn<F>(fut: F) -> Self::JoinHandle
67 where
68 F: Future<Output = ()> + Send + 'static,
69 {
70 task::spawn(async move {
71 AssertUnwindSafe(fut)
72 .catch_unwind()
73 .await
74 .map_err(AsyncStdJoinErr)
75 })
76 }
77}
78
79impl ContextExt for AsyncStdRuntime {
80 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
81 where
82 F: Future<Output = R> + Send + 'static,
83 {
84 let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
85 Box::pin(async move {
86 let result = fut.await;
87 TASK_LOCALS.with(|c| c.replace(old));
88 result
89 })
90 }
91
92 fn get_task_locals() -> Option<TaskLocals> {
93 TASK_LOCALS
94 .try_with(|c| {
95 c.borrow()
96 .as_ref()
97 .map(|locals| Python::with_gil(|py| locals.clone_ref(py)))
98 })
99 .unwrap_or_default()
100 }
101}
102
103impl SpawnLocalExt for AsyncStdRuntime {
104 fn spawn_local<F>(fut: F) -> Self::JoinHandle
105 where
106 F: Future<Output = ()> + 'static,
107 {
108 task::spawn_local(async move {
109 fut.await;
110 Ok(())
111 })
112 }
113}
114
115impl LocalContextExt for AsyncStdRuntime {
116 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
117 where
118 F: Future<Output = R> + 'static,
119 {
120 let old = TASK_LOCALS.with(|c| c.replace(Some(locals)));
121 Box::pin(async move {
122 let result = fut.await;
123 TASK_LOCALS.with(|c| c.replace(old));
124 result
125 })
126 }
127}
128
129/// Set the task local event loop for the given future
130pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
131where
132 F: Future<Output = R> + Send + 'static,
133{
134 AsyncStdRuntime::scope(locals, fut).await
135}
136
137/// Set the task local event loop for the given !Send future
138pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
139where
140 F: Future<Output = R> + 'static,
141{
142 AsyncStdRuntime::scope_local(locals, fut).await
143}
144
145/// Get the current event loop from either Python or Rust async task local context
146///
147/// This function first checks if the runtime has a task-local reference to the Python event loop.
148/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
149/// associated with the current OS thread.
150pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
151 generic::get_current_loop::<AsyncStdRuntime>(py)
152}
153
154/// Either copy the task locals from the current task OR get the current running loop and
155/// contextvars from Python.
156pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
157 generic::get_current_locals::<AsyncStdRuntime>(py)
158}
159
160/// Run the event loop until the given Future completes
161///
162/// The event loop runs until the given future is complete.
163///
164/// After this function returns, the event loop can be resumed with [`run_until_complete`]
165///
166/// # Arguments
167/// * `event_loop` - The Python event loop that should run the future
168/// * `fut` - The future to drive to completion
169///
170/// # Examples
171///
172/// ```
173/// # use std::time::Duration;
174/// #
175/// # use pyo3::prelude::*;
176/// #
177/// # pyo3::prepare_freethreaded_python();
178/// #
179/// # Python::with_gil(|py| -> PyResult<()> {
180/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
181/// pyo3_async_runtimes::async_std::run_until_complete(event_loop, async move {
182/// async_std::task::sleep(Duration::from_secs(1)).await;
183/// Ok(())
184/// })?;
185/// # Ok(())
186/// # }).unwrap();
187/// ```
188pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
189where
190 F: Future<Output = PyResult<T>> + Send + 'static,
191 T: Send + Sync + 'static,
192{
193 generic::run_until_complete::<AsyncStdRuntime, _, T>(&event_loop, fut)
194}
195
196/// Run the event loop until the given Future completes
197///
198/// # Arguments
199/// * `py` - The current PyO3 GIL guard
200/// * `fut` - The future to drive to completion
201///
202/// # Examples
203///
204/// ```no_run
205/// # use std::time::Duration;
206/// #
207/// # use pyo3::prelude::*;
208/// #
209/// fn main() {
210/// // call this or use pyo3 0.14 "auto-initialize" feature
211/// pyo3::prepare_freethreaded_python();
212///
213/// Python::with_gil(|py| {
214/// pyo3_async_runtimes::async_std::run(py, async move {
215/// async_std::task::sleep(Duration::from_secs(1)).await;
216/// Ok(())
217/// })
218/// .map_err(|e| {
219/// e.print_and_set_sys_last_vars(py);
220/// })
221/// .unwrap();
222/// })
223/// }
224/// ```
225pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
226where
227 F: Future<Output = PyResult<T>> + Send + 'static,
228 T: Send + Sync + 'static,
229{
230 generic::run::<AsyncStdRuntime, F, T>(py, fut)
231}
232
233/// Convert a Rust Future into a Python awaitable
234///
235/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
236/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
237///
238/// Python `contextvars` are preserved when calling async Python functions within the Rust future
239/// via [`into_future`] (new behaviour in `v0.15`).
240///
241/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
242/// > unfortunately fail to resolve them when called within the Rust future. This is because the
243/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
244/// >
245/// > As a workaround, you can get the `contextvars` from the current task locals using
246/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
247/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
248/// > synchronous function, and restore the previous context when it returns or raises an exception.
249///
250/// # Arguments
251/// * `py` - PyO3 GIL guard
252/// * `locals` - The task locals for the given future
253/// * `fut` - The Rust future to be converted
254///
255/// # Examples
256///
257/// ```
258/// use std::time::Duration;
259///
260/// use pyo3::prelude::*;
261///
262/// /// Awaitable sleep function
263/// #[pyfunction]
264/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
265/// let secs = secs.extract()?;
266/// pyo3_async_runtimes::async_std::future_into_py_with_locals(
267/// py,
268/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
269/// async move {
270/// async_std::task::sleep(Duration::from_secs(secs)).await;
271/// Python::with_gil(|py| Ok(py.None()))
272/// }
273/// )
274/// }
275/// ```
276pub fn future_into_py_with_locals<F, T>(
277 py: Python,
278 locals: TaskLocals,
279 fut: F,
280) -> PyResult<Bound<PyAny>>
281where
282 F: Future<Output = PyResult<T>> + Send + 'static,
283 T: for<'py> IntoPyObject<'py>,
284{
285 generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
286}
287
288/// Convert a Rust Future into a Python awaitable
289///
290/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
291/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
292///
293/// Python `contextvars` are preserved when calling async Python functions within the Rust future
294/// via [`into_future`] (new behaviour in `v0.15`).
295///
296/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
297/// > unfortunately fail to resolve them when called within the Rust future. This is because the
298/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
299/// >
300/// > As a workaround, you can get the `contextvars` from the current task locals using
301/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
302/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
303/// > synchronous function, and restore the previous context when it returns or raises an exception.
304///
305/// # Arguments
306/// * `py` - The current PyO3 GIL guard
307/// * `fut` - The Rust future to be converted
308///
309/// # Examples
310///
311/// ```
312/// use std::time::Duration;
313///
314/// use pyo3::prelude::*;
315///
316/// /// Awaitable sleep function
317/// #[pyfunction]
318/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
319/// let secs = secs.extract()?;
320/// pyo3_async_runtimes::async_std::future_into_py(py, async move {
321/// async_std::task::sleep(Duration::from_secs(secs)).await;
322/// Ok(())
323/// })
324/// }
325/// ```
326pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
327where
328 F: Future<Output = PyResult<T>> + Send + 'static,
329 T: for<'py> IntoPyObject<'py>,
330{
331 generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
332}
333
334/// Convert a `!Send` Rust Future into a Python awaitable
335///
336/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
337/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
338///
339/// Python `contextvars` are preserved when calling async Python functions within the Rust future
340/// via [`into_future`] (new behaviour in `v0.15`).
341///
342/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
343/// > unfortunately fail to resolve them when called within the Rust future. This is because the
344/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
345/// >
346/// > As a workaround, you can get the `contextvars` from the current task locals using
347/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
348/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
349/// > synchronous function, and restore the previous context when it returns or raises an exception.
350///
351/// # Arguments
352/// * `py` - PyO3 GIL guard
353/// * `locals` - The task locals for the given future
354/// * `fut` - The Rust future to be converted
355///
356/// # Examples
357///
358/// ```
359/// use std::{rc::Rc, time::Duration};
360///
361/// use pyo3::prelude::*;
362///
363/// /// Awaitable non-send sleep function
364/// #[pyfunction]
365/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
366/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
367/// let secs = Rc::new(secs);
368/// Ok(pyo3_async_runtimes::async_std::local_future_into_py_with_locals(
369/// py,
370/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
371/// async move {
372/// async_std::task::sleep(Duration::from_secs(*secs)).await;
373/// Python::with_gil(|py| Ok(py.None()))
374/// }
375/// )?.into())
376/// }
377///
378/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
379/// #[pyo3_async_runtimes::async_std::main]
380/// async fn main() -> PyResult<()> {
381/// Python::with_gil(|py| {
382/// let py_future = sleep_for(py, 1)?;
383/// pyo3_async_runtimes::async_std::into_future(py_future)
384/// })?
385/// .await?;
386///
387/// Ok(())
388/// }
389/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
390/// # fn main() {}
391/// ```
392#[deprecated(
393 since = "0.18.0",
394 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
395)]
396#[allow(deprecated)]
397pub fn local_future_into_py_with_locals<F, T>(
398 py: Python,
399 locals: TaskLocals,
400 fut: F,
401) -> PyResult<Bound<PyAny>>
402where
403 F: Future<Output = PyResult<T>> + 'static,
404 T: for<'py> IntoPyObject<'py>,
405{
406 generic::local_future_into_py_with_locals::<AsyncStdRuntime, _, T>(py, locals, fut)
407}
408
409/// Convert a `!Send` Rust Future into a Python awaitable
410///
411/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
412/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
413///
414/// Python `contextvars` are preserved when calling async Python functions within the Rust future
415/// via [`into_future`] (new behaviour in `v0.15`).
416///
417/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
418/// > unfortunately fail to resolve them when called within the Rust future. This is because the
419/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
420/// >
421/// > As a workaround, you can get the `contextvars` from the current task locals using
422/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
423/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
424/// > synchronous function, and restore the previous context when it returns or raises an exception.
425///
426/// # Arguments
427/// * `py` - The current PyO3 GIL guard
428/// * `fut` - The Rust future to be converted
429///
430/// # Examples
431///
432/// ```
433/// use std::{rc::Rc, time::Duration};
434///
435/// use pyo3::prelude::*;
436///
437/// /// Awaitable non-send sleep function
438/// #[pyfunction]
439/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
440/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py
441/// let secs = Rc::new(secs);
442/// pyo3_async_runtimes::async_std::local_future_into_py(py, async move {
443/// async_std::task::sleep(Duration::from_secs(*secs)).await;
444/// Ok(())
445/// })
446/// }
447///
448/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))]
449/// #[pyo3_async_runtimes::async_std::main]
450/// async fn main() -> PyResult<()> {
451/// Python::with_gil(|py| {
452/// let py_future = sleep_for(py, 1)?;
453/// pyo3_async_runtimes::async_std::into_future(py_future)
454/// })?
455/// .await?;
456///
457/// Ok(())
458/// }
459/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))]
460/// # fn main() {}
461/// ```
462#[deprecated(
463 since = "0.18.0",
464 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
465)]
466#[allow(deprecated)]
467pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
468where
469 F: Future<Output = PyResult<T>> + 'static,
470 T: for<'py> IntoPyObject<'py>,
471{
472 generic::local_future_into_py::<AsyncStdRuntime, _, T>(py, fut)
473}
474
475/// Convert a Python `awaitable` into a Rust Future
476///
477/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
478/// completion handler sends the result of this Task through a
479/// `futures::channel::oneshot::Sender<PyResult<PyObject>>` and the future returned by this function
480/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<PyObject>>`.
481///
482/// # Arguments
483/// * `awaitable` - The Python `awaitable` to be converted
484///
485/// # Examples
486///
487/// ```
488/// use std::time::Duration;
489/// use std::ffi::CString;
490///
491/// use pyo3::prelude::*;
492///
493/// const PYTHON_CODE: &'static str = r#"
494/// import asyncio
495///
496/// async def py_sleep(duration):
497/// await asyncio.sleep(duration)
498/// "#;
499///
500/// async fn py_sleep(seconds: f32) -> PyResult<()> {
501/// let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
502/// Ok(
503/// PyModule::from_code(
504/// py,
505/// &CString::new(PYTHON_CODE).unwrap(),
506/// &CString::new("test_into_future/test_mod.py").unwrap(),
507/// &CString::new("test_mod").unwrap()
508/// )?
509/// .into()
510/// )
511/// })?;
512///
513/// Python::with_gil(|py| {
514/// pyo3_async_runtimes::async_std::into_future(
515/// test_mod
516/// .call_method1(py, "py_sleep", (seconds.into_pyobject(py).unwrap(),))?
517/// .into_bound(py),
518/// )
519/// })?
520/// .await?;
521/// Ok(())
522/// }
523/// ```
524pub fn into_future(
525 awaitable: Bound<PyAny>,
526) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
527 generic::into_future::<AsyncStdRuntime>(awaitable)
528}
529
530/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
531///
532/// **This API is marked as unstable** and is only available when the
533/// `unstable-streams` crate feature is enabled. This comes with no
534/// stability guarantees, and could be changed or removed at any time.
535///
536/// # Arguments
537/// * `gen` - The Python async generator to be converted
538///
539/// # Examples
540/// ```
541/// use pyo3::prelude::*;
542/// use futures::{StreamExt, TryStreamExt};
543/// use std::ffi::CString;
544///
545/// const TEST_MOD: &str = r#"
546/// import asyncio
547///
548/// async def gen():
549/// for i in range(10):
550/// await asyncio.sleep(0.1)
551/// yield i
552/// "#;
553///
554/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
555/// # #[pyo3_async_runtimes::async_std::main]
556/// # async fn main() -> PyResult<()> {
557/// let stream = Python::with_gil(|py| {
558/// let test_mod = PyModule::from_code(
559/// py,
560/// &CString::new(TEST_MOD).unwrap(),
561/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
562/// &CString::new("test_mod").unwrap(),
563/// )?;
564///
565/// pyo3_async_runtimes::async_std::into_stream_v1(test_mod.call_method0("gen")?)
566/// })?;
567///
568/// let vals = stream
569/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
570/// .try_collect::<Vec<i32>>()
571/// .await?;
572///
573/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
574///
575/// Ok(())
576/// # }
577/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
578/// # fn main() {}
579/// ```
580#[cfg(feature = "unstable-streams")]
581pub fn into_stream_v1(
582 gen: Bound<'_, PyAny>,
583) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
584 generic::into_stream_v1::<AsyncStdRuntime>(gen)
585}
586
587/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
588///
589/// **This API is marked as unstable** and is only available when the
590/// `unstable-streams` crate feature is enabled. This comes with no
591/// stability guarantees, and could be changed or removed at any time.
592///
593/// # Arguments
594/// * `locals` - The current task locals
595/// * `gen` - The Python async generator to be converted
596///
597/// # Examples
598/// ```
599/// use pyo3::prelude::*;
600/// use futures::{StreamExt, TryStreamExt};
601/// use std::ffi::CString;
602///
603/// const TEST_MOD: &str = r#"
604/// import asyncio
605///
606/// async def gen():
607/// for i in range(10):
608/// await asyncio.sleep(0.1)
609/// yield i
610/// "#;
611///
612/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
613/// # #[pyo3_async_runtimes::async_std::main]
614/// # async fn main() -> PyResult<()> {
615/// let stream = Python::with_gil(|py| {
616/// let test_mod = PyModule::from_code(
617/// py,
618/// &CString::new(TEST_MOD).unwrap(),
619/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
620/// &CString::new("test_mod").unwrap(),
621/// )?;
622///
623/// pyo3_async_runtimes::async_std::into_stream_with_locals_v1(
624/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
625/// test_mod.call_method0("gen")?
626/// )
627/// })?;
628///
629/// let vals = stream
630/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
631/// .try_collect::<Vec<i32>>()
632/// .await?;
633///
634/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
635///
636/// Ok(())
637/// # }
638/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
639/// # fn main() {}
640/// ```
641#[cfg(feature = "unstable-streams")]
642pub fn into_stream_with_locals_v1(
643 locals: TaskLocals,
644 gen: Bound<'_, PyAny>,
645) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
646 generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
647}
648
649/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
650///
651/// **This API is marked as unstable** and is only available when the
652/// `unstable-streams` crate feature is enabled. This comes with no
653/// stability guarantees, and could be changed or removed at any time.
654///
655/// # Arguments
656/// * `locals` - The current task locals
657/// * `gen` - The Python async generator to be converted
658///
659/// # Examples
660/// ```
661/// use pyo3::prelude::*;
662/// use futures::{StreamExt, TryStreamExt};
663/// use std::ffi::CString;
664///
665/// const TEST_MOD: &str = r#"
666/// import asyncio
667///
668/// async def gen():
669/// for i in range(10):
670/// await asyncio.sleep(0.1)
671/// yield i
672/// "#;
673///
674/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
675/// # #[pyo3_async_runtimes::async_std::main]
676/// # async fn main() -> PyResult<()> {
677/// let stream = Python::with_gil(|py| {
678/// let test_mod = PyModule::from_code(
679/// py,
680/// &CString::new(TEST_MOD).unwrap(),
681/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
682/// &CString::new("test_mod").unwrap(),
683/// )?;
684///
685/// pyo3_async_runtimes::async_std::into_stream_with_locals_v2(
686/// pyo3_async_runtimes::async_std::get_current_locals(py)?,
687/// test_mod.call_method0("gen")?
688/// )
689/// })?;
690///
691/// let vals = stream
692/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
693/// .try_collect::<Vec<i32>>()
694/// .await?;
695///
696/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
697///
698/// Ok(())
699/// # }
700/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
701/// # fn main() {}
702/// ```
703#[cfg(feature = "unstable-streams")]
704pub fn into_stream_with_locals_v2(
705 locals: TaskLocals,
706 gen: Bound<'_, PyAny>,
707) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
708 generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
709}
710
711/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
712///
713/// **This API is marked as unstable** and is only available when the
714/// `unstable-streams` crate feature is enabled. This comes with no
715/// stability guarantees, and could be changed or removed at any time.
716///
717/// # Arguments
718/// * `gen` - The Python async generator to be converted
719///
720/// # Examples
721/// ```
722/// use pyo3::prelude::*;
723/// use futures::{StreamExt, TryStreamExt};
724/// use std::ffi::CString;
725///
726/// const TEST_MOD: &str = r#"
727/// import asyncio
728///
729/// async def gen():
730/// for i in range(10):
731/// await asyncio.sleep(0.1)
732/// yield i
733/// "#;
734///
735/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
736/// # #[pyo3_async_runtimes::async_std::main]
737/// # async fn main() -> PyResult<()> {
738/// let stream = Python::with_gil(|py| {
739/// let test_mod = PyModule::from_code(
740/// py,
741/// &CString::new(TEST_MOD).unwrap(),
742/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
743/// &CString::new("test_mod").unwrap(),
744/// )?;
745///
746/// pyo3_async_runtimes::async_std::into_stream_v2(test_mod.call_method0("gen")?)
747/// })?;
748///
749/// let vals = stream
750/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
751/// .try_collect::<Vec<i32>>()
752/// .await?;
753///
754/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
755///
756/// Ok(())
757/// # }
758/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
759/// # fn main() {}
760/// ```
761#[cfg(feature = "unstable-streams")]
762pub fn into_stream_v2(
763 gen: Bound<'_, PyAny>,
764) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
765 generic::into_stream_v2::<AsyncStdRuntime>(gen)
766}