pyo3_async_runtimes/
tokio.rs

1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! >are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::cell::OnceCell;
17use std::ops::Deref;
18use std::sync::OnceLock;
19use std::{future::Future, pin::Pin, sync::Mutex};
20
21use ::tokio::{
22    runtime::{Builder, Runtime},
23    task,
24};
25use once_cell::sync::Lazy;
26use pyo3::prelude::*;
27
28use crate::{
29    generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
30    TaskLocals,
31};
32
33/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
34/// re-exports for macros
35#[cfg(feature = "attributes")]
36pub mod re_exports {
37    /// re-export pending to be used in tokio macros without additional dependency
38    pub use futures::future::pending;
39    /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
40    pub use tokio::runtime;
41}
42
43/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
44#[cfg(feature = "attributes")]
45pub use pyo3_async_runtimes_macros::tokio_main as main;
46
47/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
49/// Registers a `tokio` test with the `pyo3-asyncio` test harness
50#[cfg(all(feature = "attributes", feature = "testing"))]
51pub use pyo3_async_runtimes_macros::tokio_test as test;
52
53enum Pyo3Runtime {
54    Borrowed(&'static Runtime),
55    Owned(Runtime),
56}
57impl Deref for Pyo3Runtime {
58    type Target = Runtime;
59    fn deref(&self) -> &Self::Target {
60        match self {
61            Self::Borrowed(rt) => rt,
62            Self::Owned(rt) => rt,
63        }
64    }
65}
66
67static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
68static TOKIO_RUNTIME: OnceLock<Pyo3Runtime> = OnceLock::new();
69
70impl generic::JoinError for task::JoinError {
71    fn is_panic(&self) -> bool {
72        task::JoinError::is_panic(self)
73    }
74    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
75        task::JoinError::into_panic(self)
76    }
77}
78
79struct TokioRuntime;
80
81tokio::task_local! {
82    static TASK_LOCALS: OnceCell<TaskLocals>;
83}
84
85impl GenericRuntime for TokioRuntime {
86    type JoinError = task::JoinError;
87    type JoinHandle = task::JoinHandle<()>;
88
89    fn spawn<F>(fut: F) -> Self::JoinHandle
90    where
91        F: Future<Output = ()> + Send + 'static,
92    {
93        get_runtime().spawn(async move {
94            fut.await;
95        })
96    }
97
98    fn spawn_blocking<F>(f: F) -> Self::JoinHandle
99    where
100        F: FnOnce() + Send + 'static,
101    {
102        get_runtime().spawn_blocking(f)
103    }
104}
105
106impl ContextExt for TokioRuntime {
107    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
108    where
109        F: Future<Output = R> + Send + 'static,
110    {
111        let cell = OnceCell::new();
112        cell.set(locals).unwrap();
113
114        Box::pin(TASK_LOCALS.scope(cell, fut))
115    }
116
117    fn get_task_locals() -> Option<TaskLocals> {
118        TASK_LOCALS
119            .try_with(|c| c.get().map(|locals| locals.clone()))
120            .unwrap_or_default()
121    }
122}
123
124impl SpawnLocalExt for TokioRuntime {
125    fn spawn_local<F>(fut: F) -> Self::JoinHandle
126    where
127        F: Future<Output = ()> + 'static,
128    {
129        tokio::task::spawn_local(fut)
130    }
131}
132
133impl LocalContextExt for TokioRuntime {
134    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
135    where
136        F: Future<Output = R> + 'static,
137    {
138        let cell = OnceCell::new();
139        cell.set(locals).unwrap();
140
141        Box::pin(TASK_LOCALS.scope(cell, fut))
142    }
143}
144
145/// Set the task local event loop for the given future
146pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
147where
148    F: Future<Output = R> + Send + 'static,
149{
150    TokioRuntime::scope(locals, fut).await
151}
152
153/// Set the task local event loop for the given !Send future
154pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
155where
156    F: Future<Output = R> + 'static,
157{
158    TokioRuntime::scope_local(locals, fut).await
159}
160
161/// Get the current event loop from either Python or Rust async task local context
162///
163/// This function first checks if the runtime has a task-local reference to the Python event loop.
164/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
165/// associated with the current OS thread.
166pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
167    generic::get_current_loop::<TokioRuntime>(py)
168}
169
170/// Either copy the task locals from the current task OR get the current running loop and
171/// contextvars from Python.
172pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
173    generic::get_current_locals::<TokioRuntime>(py)
174}
175
176/// Initialize the Tokio runtime with a custom build
177pub fn init(builder: Builder) {
178    *TOKIO_BUILDER.lock().unwrap() = builder
179}
180
181/// Initialize the Tokio runtime with a custom Tokio runtime
182///
183/// Returns Ok(()) if success and Err(()) if it had been inited.
184#[allow(clippy::result_unit_err)]
185pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
186    TOKIO_RUNTIME
187        .set(Pyo3Runtime::Borrowed(runtime))
188        .map_err(|_| ())
189}
190
191/// Get a reference to the current tokio runtime
192pub fn get_runtime<'a>() -> &'a Runtime {
193    TOKIO_RUNTIME.get_or_init(|| {
194        let rt = TOKIO_BUILDER
195            .lock()
196            .unwrap()
197            .build()
198            .expect("Unable to build Tokio runtime");
199        Pyo3Runtime::Owned(rt)
200    })
201}
202
203fn multi_thread() -> Builder {
204    let mut builder = Builder::new_multi_thread();
205    builder.enable_all();
206    builder
207}
208
209/// Run the event loop until the given Future completes
210///
211/// The event loop runs until the given future is complete.
212///
213/// After this function returns, the event loop can be resumed with [`run_until_complete`]
214///
215/// # Arguments
216/// * `event_loop` - The Python event loop that should run the future
217/// * `fut` - The future to drive to completion
218///
219/// # Examples
220///
221/// ```
222/// # use std::time::Duration;
223/// #
224/// # use pyo3::prelude::*;
225/// #
226/// # Python::initialize();
227/// # Python::attach(|py| -> PyResult<()> {
228/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
229/// pyo3_async_runtimes::tokio::run_until_complete(event_loop, async move {
230///     tokio::time::sleep(Duration::from_secs(1)).await;
231///     Ok(())
232/// })?;
233/// # Ok(())
234/// # }).unwrap();
235/// ```
236pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
237where
238    F: Future<Output = PyResult<T>> + Send + 'static,
239    T: Send + Sync + 'static,
240{
241    generic::run_until_complete::<TokioRuntime, _, T>(&event_loop, fut)
242}
243
244/// Run the event loop until the given Future completes
245///
246/// # Arguments
247/// * `py` - The current PyO3 GIL guard
248/// * `fut` - The future to drive to completion
249///
250/// # Examples
251///
252/// ```no_run
253/// # use std::time::Duration;
254/// #
255/// # use pyo3::prelude::*;
256/// #
257/// fn main() {
258///     Python::attach(|py| {
259///         pyo3_async_runtimes::tokio::run(py, async move {
260///             tokio::time::sleep(Duration::from_secs(1)).await;
261///             Ok(())
262///         })
263///         .map_err(|e| {
264///             e.print_and_set_sys_last_vars(py);
265///         })
266///         .unwrap();
267///     })
268/// }
269/// ```
270pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
271where
272    F: Future<Output = PyResult<T>> + Send + 'static,
273    T: Send + Sync + 'static,
274{
275    generic::run::<TokioRuntime, F, T>(py, fut)
276}
277
278/// Convert a Rust Future into a Python awaitable
279///
280/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
281/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
282///
283/// Python `contextvars` are preserved when calling async Python functions within the Rust future
284/// via [`into_future`] (new behaviour in `v0.15`).
285///
286/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
287/// > unfortunately fail to resolve them when called within the Rust future. This is because the
288/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
289/// >
290/// > As a workaround, you can get the `contextvars` from the current task locals using
291/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
292/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
293/// > synchronous function, and restore the previous context when it returns or raises an exception.
294///
295/// # Arguments
296/// * `py` - PyO3 GIL guard
297/// * `locals` - The task locals for the given future
298/// * `fut` - The Rust future to be converted
299///
300/// # Examples
301///
302/// ```
303/// use std::time::Duration;
304///
305/// use pyo3::prelude::*;
306///
307/// /// Awaitable sleep function
308/// #[pyfunction]
309/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
310///     let secs = secs.extract()?;
311///     pyo3_async_runtimes::tokio::future_into_py_with_locals(
312///         py,
313///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
314///         async move {
315///             tokio::time::sleep(Duration::from_secs(secs)).await;
316///             Python::attach(|py| Ok(py.None()))
317///         }
318///     )
319/// }
320/// ```
321pub fn future_into_py_with_locals<F, T>(
322    py: Python,
323    locals: TaskLocals,
324    fut: F,
325) -> PyResult<Bound<PyAny>>
326where
327    F: Future<Output = PyResult<T>> + Send + 'static,
328    T: for<'py> IntoPyObject<'py> + Send + 'static,
329{
330    generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
331}
332
333/// Convert a Rust Future into a Python awaitable
334///
335/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
336/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
337///
338/// Python `contextvars` are preserved when calling async Python functions within the Rust future
339/// via [`into_future`] (new behaviour in `v0.15`).
340///
341/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
342/// > unfortunately fail to resolve them when called within the Rust future. This is because the
343/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
344/// >
345/// > As a workaround, you can get the `contextvars` from the current task locals using
346/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
347/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
348/// > synchronous function, and restore the previous context when it returns or raises an exception.
349///
350/// # Arguments
351/// * `py` - The current PyO3 GIL guard
352/// * `fut` - The Rust future to be converted
353///
354/// # Examples
355///
356/// ```
357/// use std::time::Duration;
358///
359/// use pyo3::prelude::*;
360///
361/// /// Awaitable sleep function
362/// #[pyfunction]
363/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
364///     let secs = secs.extract()?;
365///     pyo3_async_runtimes::tokio::future_into_py(py, async move {
366///         tokio::time::sleep(Duration::from_secs(secs)).await;
367///         Ok(())
368///     })
369/// }
370/// ```
371pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
372where
373    F: Future<Output = PyResult<T>> + Send + 'static,
374    T: for<'py> IntoPyObject<'py> + Send + 'static,
375{
376    generic::future_into_py::<TokioRuntime, _, T>(py, fut)
377}
378
379/// Convert a `!Send` Rust Future into a Python awaitable
380///
381/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
382/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
383///
384/// Python `contextvars` are preserved when calling async Python functions within the Rust future
385/// via [`into_future`] (new behaviour in `v0.15`).
386///
387/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
388/// > unfortunately fail to resolve them when called within the Rust future. This is because the
389/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
390/// >
391/// > As a workaround, you can get the `contextvars` from the current task locals using
392/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
393/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
394/// > synchronous function, and restore the previous context when it returns or raises an exception.
395///
396/// # Arguments
397/// * `py` - PyO3 GIL guard
398/// * `locals` - The task locals for the given future
399/// * `fut` - The Rust future to be converted
400///
401/// # Examples
402///
403/// ```
404/// use std::{rc::Rc, time::Duration};
405///
406/// use pyo3::prelude::*;
407///
408/// /// Awaitable non-send sleep function
409/// #[pyfunction]
410/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
411///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
412///     let secs = Rc::new(secs);
413///
414///     pyo3_async_runtimes::tokio::local_future_into_py_with_locals(
415///         py,
416///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
417///         async move {
418///             tokio::time::sleep(Duration::from_secs(*secs)).await;
419///             Python::attach(|py| Ok(py.None()))
420///         }
421///     )
422/// }
423///
424/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
425/// #[pyo3_async_runtimes::tokio::main]
426/// async fn main() -> PyResult<()> {
427///     let locals = Python::attach(|py| -> PyResult<_> {
428///         pyo3_async_runtimes::tokio::get_current_locals(py)
429///     })?;
430///
431///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
432///     // we use spawn_blocking in order to use LocalSet::block_on
433///     tokio::task::spawn_blocking(move || {
434///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
435///         // pyo3_async_runtimes::tokio::local_future_into_py will panic.
436///         tokio::task::LocalSet::new().block_on(
437///             pyo3_async_runtimes::tokio::get_runtime(),
438///             pyo3_async_runtimes::tokio::scope_local(locals, async {
439///                 Python::attach(|py| {
440///                     let py_future = sleep_for(py, 1)?;
441///                     pyo3_async_runtimes::tokio::into_future(py_future)
442///                 })?
443///                 .await?;
444///
445///                 Ok(())
446///             })
447///         )
448///     }).await.unwrap()
449/// }
450/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
451/// # fn main() {}
452/// ```
453#[deprecated(
454    since = "0.18.0",
455    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
456)]
457#[allow(deprecated)]
458pub fn local_future_into_py_with_locals<F, T>(
459    py: Python,
460    locals: TaskLocals,
461    fut: F,
462) -> PyResult<Bound<PyAny>>
463where
464    F: Future<Output = PyResult<T>> + 'static,
465    T: for<'py> IntoPyObject<'py>,
466{
467    generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
468}
469
470/// Convert a `!Send` Rust Future into a Python awaitable
471///
472/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
473/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
474///
475/// Python `contextvars` are preserved when calling async Python functions within the Rust future
476/// via [`into_future`] (new behaviour in `v0.15`).
477///
478/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
479/// > unfortunately fail to resolve them when called within the Rust future. This is because the
480/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
481/// >
482/// > As a workaround, you can get the `contextvars` from the current task locals using
483/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
484/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
485/// > synchronous function, and restore the previous context when it returns or raises an exception.
486///
487/// # Arguments
488/// * `py` - The current PyO3 GIL guard
489/// * `fut` - The Rust future to be converted
490///
491/// # Examples
492///
493/// ```
494/// use std::{rc::Rc, time::Duration};
495///
496/// use pyo3::prelude::*;
497///
498/// /// Awaitable non-send sleep function
499/// #[pyfunction]
500/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
501///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
502///     let secs = Rc::new(secs);
503///     pyo3_async_runtimes::tokio::local_future_into_py(py, async move {
504///         tokio::time::sleep(Duration::from_secs(*secs)).await;
505///         Ok(())
506///     })
507/// }
508///
509/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
510/// #[pyo3_async_runtimes::tokio::main]
511/// async fn main() -> PyResult<()> {
512///     let locals = Python::attach(|py| {
513///         pyo3_async_runtimes::tokio::get_current_locals(py).unwrap()
514///     });
515///
516///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
517///     // we use spawn_blocking in order to use LocalSet::block_on
518///     tokio::task::spawn_blocking(move || {
519///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
520///         // pyo3_async_runtimes::tokio::local_future_into_py will panic.
521///         tokio::task::LocalSet::new().block_on(
522///             pyo3_async_runtimes::tokio::get_runtime(),
523///             pyo3_async_runtimes::tokio::scope_local(locals, async {
524///                 Python::attach(|py| {
525///                     let py_future = sleep_for(py, 1)?;
526///                     pyo3_async_runtimes::tokio::into_future(py_future)
527///                 })?
528///                 .await?;
529///
530///                 Ok(())
531///             })
532///         )
533///     }).await.unwrap()
534/// }
535/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
536/// # fn main() {}
537/// ```
538#[deprecated(
539    since = "0.18.0",
540    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
541)]
542#[allow(deprecated)]
543pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
544where
545    F: Future<Output = PyResult<T>> + 'static,
546    T: for<'py> IntoPyObject<'py>,
547{
548    generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
549}
550
551/// Convert a Python `awaitable` into a Rust Future
552///
553/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
554/// completion handler sends the result of this Task through a
555/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
556/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
557///
558/// # Arguments
559/// * `awaitable` - The Python `awaitable` to be converted
560///
561/// # Examples
562///
563/// ```
564/// use std::time::Duration;
565/// use std::ffi::CString;
566///
567/// use pyo3::prelude::*;
568///
569/// const PYTHON_CODE: &'static str = r#"
570/// import asyncio
571///
572/// async def py_sleep(duration):
573///     await asyncio.sleep(duration)
574/// "#;
575///
576/// async fn py_sleep(seconds: f32) -> PyResult<()> {
577///     let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
578///         Ok(
579///             PyModule::from_code(
580///                 py,
581///                 &CString::new(PYTHON_CODE).unwrap(),
582///                 &CString::new("test_into_future/test_mod.py").unwrap(),
583///                 &CString::new("test_mod").unwrap(),
584///             )?
585///             .into()
586///         )
587///     })?;
588///
589///     Python::attach(|py| {
590///         pyo3_async_runtimes::tokio::into_future(
591///             test_mod
592///                 .call_method1(py, "py_sleep", (seconds,))?
593///                 .into_bound(py),
594///         )
595///     })?
596///     .await?;
597///     Ok(())
598/// }
599/// ```
600pub fn into_future(
601    awaitable: Bound<PyAny>,
602) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
603    generic::into_future::<TokioRuntime>(awaitable)
604}
605
606/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
607///
608/// **This API is marked as unstable** and is only available when the
609/// `unstable-streams` crate feature is enabled. This comes with no
610/// stability guarantees, and could be changed or removed at any time.
611///
612/// # Arguments
613/// * `locals` - The current task locals
614/// * `gen` - The Python async generator to be converted
615///
616/// # Examples
617/// ```
618/// use pyo3::prelude::*;
619/// use futures::{StreamExt, TryStreamExt};
620/// use std::ffi::CString;
621///
622/// const TEST_MOD: &str = r#"
623/// import asyncio
624///
625/// async def gen():
626///     for i in range(10):
627///         await asyncio.sleep(0.1)
628///         yield i
629/// "#;
630///
631/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
632/// # #[pyo3_async_runtimes::tokio::main]
633/// # async fn main() -> PyResult<()> {
634/// let stream = Python::attach(|py| {
635///     let test_mod = PyModule::from_code(
636///         py,
637///         &CString::new(TEST_MOD).unwrap(),
638///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
639///         &CString::new("test_mod").unwrap(),
640///     )?;
641///
642///     pyo3_async_runtimes::tokio::into_stream_with_locals_v1(
643///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
644///         test_mod.call_method0("gen")?
645///     )
646/// })?;
647///
648/// let vals = stream
649///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
650///     .try_collect::<Vec<i32>>()
651///     .await?;
652///
653/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
654///
655/// Ok(())
656/// # }
657/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
658/// # fn main() {}
659/// ```
660#[cfg(feature = "unstable-streams")]
661pub fn into_stream_with_locals_v1(
662    locals: TaskLocals,
663    gen: Bound<'_, PyAny>,
664) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
665    generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
666}
667
668/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
669///
670/// **This API is marked as unstable** and is only available when the
671/// `unstable-streams` crate feature is enabled. This comes with no
672/// stability guarantees, and could be changed or removed at any time.
673///
674/// # Arguments
675/// * `gen` - The Python async generator to be converted
676///
677/// # Examples
678/// ```
679/// use pyo3::prelude::*;
680/// use futures::{StreamExt, TryStreamExt};
681/// use std::ffi::CString;
682///
683/// const TEST_MOD: &str = r#"
684/// import asyncio
685///
686/// async def gen():
687///     for i in range(10):
688///         await asyncio.sleep(0.1)
689///         yield i
690/// "#;
691///
692/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
693/// # #[pyo3_async_runtimes::tokio::main]
694/// # async fn main() -> PyResult<()> {
695/// let stream = Python::attach(|py| {
696///     let test_mod = PyModule::from_code(
697///         py,
698///         &CString::new(TEST_MOD).unwrap(),
699///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
700///         &CString::new("test_mod").unwrap(),
701///     )?;
702///
703///     pyo3_async_runtimes::tokio::into_stream_v1(test_mod.call_method0("gen")?)
704/// })?;
705///
706/// let vals = stream
707///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
708///     .try_collect::<Vec<i32>>()
709///     .await?;
710///
711/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
712///
713/// Ok(())
714/// # }
715/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
716/// # fn main() {}
717/// ```
718#[cfg(feature = "unstable-streams")]
719pub fn into_stream_v1(
720    gen: Bound<'_, PyAny>,
721) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
722    generic::into_stream_v1::<TokioRuntime>(gen)
723}
724
725/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
726///
727/// **This API is marked as unstable** and is only available when the
728/// `unstable-streams` crate feature is enabled. This comes with no
729/// stability guarantees, and could be changed or removed at any time.
730///
731/// # Arguments
732/// * `locals` - The current task locals
733/// * `gen` - The Python async generator to be converted
734///
735/// # Examples
736/// ```
737/// use pyo3::prelude::*;
738/// use futures::{StreamExt, TryStreamExt};
739/// use std::ffi::CString;
740///
741/// const TEST_MOD: &str = r#"
742/// import asyncio
743///
744/// async def gen():
745///     for i in range(10):
746///         await asyncio.sleep(0.1)
747///         yield i
748/// "#;
749///
750/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
751/// # #[pyo3_async_runtimes::tokio::main]
752/// # async fn main() -> PyResult<()> {
753/// let stream = Python::attach(|py| {
754///     let test_mod = PyModule::from_code(
755///         py,
756///         &CString::new(TEST_MOD).unwrap(),
757///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
758///         &CString::new("test_mod").unwrap(),
759///     )?;
760///
761///     pyo3_async_runtimes::tokio::into_stream_with_locals_v2(
762///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
763///         test_mod.call_method0("gen")?
764///     )
765/// })?;
766///
767/// let vals = stream
768///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
769///     .try_collect::<Vec<i32>>()
770///     .await?;
771///
772/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
773///
774/// Ok(())
775/// # }
776/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
777/// # fn main() {}
778/// ```
779#[cfg(feature = "unstable-streams")]
780pub fn into_stream_with_locals_v2(
781    locals: TaskLocals,
782    gen: Bound<'_, PyAny>,
783) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
784    generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
785}
786
787/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
788///
789/// **This API is marked as unstable** and is only available when the
790/// `unstable-streams` crate feature is enabled. This comes with no
791/// stability guarantees, and could be changed or removed at any time.
792///
793/// # Arguments
794/// * `gen` - The Python async generator to be converted
795///
796/// # Examples
797/// ```
798/// use pyo3::prelude::*;
799/// use futures::{StreamExt, TryStreamExt};
800/// use std::ffi::CString;
801///
802/// const TEST_MOD: &str = r#"
803/// import asyncio
804///
805/// async def gen():
806///     for i in range(10):
807///         await asyncio.sleep(0.1)
808///         yield i
809/// "#;
810///
811/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
812/// # #[pyo3_async_runtimes::tokio::main]
813/// # async fn main() -> PyResult<()> {
814/// let stream = Python::attach(|py| {
815///     let test_mod = PyModule::from_code(
816///         py,
817///         &CString::new(TEST_MOD).unwrap(),
818///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
819///         &CString::new("test_mod").unwrap(),
820///     )?;
821///
822///     pyo3_async_runtimes::tokio::into_stream_v2(test_mod.call_method0("gen")?)
823/// })?;
824///
825/// let vals = stream
826///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
827///     .try_collect::<Vec<i32>>()
828///     .await?;
829///
830/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
831///
832/// Ok(())
833/// # }
834/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
835/// # fn main() {}
836/// ```
837#[cfg(feature = "unstable-streams")]
838pub fn into_stream_v2(
839    gen: Bound<'_, PyAny>,
840) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
841    generic::into_stream_v2::<TokioRuntime>(gen)
842}