pyo3_async_runtimes/
tokio.rs

1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! >are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::cell::OnceCell;
17use std::ops::Deref;
18use std::sync::OnceLock;
19use std::{future::Future, pin::Pin, sync::Mutex};
20
21use ::tokio::{
22    runtime::{Builder, Runtime},
23    task,
24};
25use once_cell::sync::Lazy;
26use pyo3::prelude::*;
27
28use crate::{
29    generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
30    TaskLocals,
31};
32
33/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
34/// re-exports for macros
35#[cfg(feature = "attributes")]
36pub mod re_exports {
37    /// re-export pending to be used in tokio macros without additional dependency
38    pub use futures::future::pending;
39    /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
40    pub use tokio::runtime;
41}
42
43/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
44#[cfg(feature = "attributes")]
45pub use pyo3_async_runtimes_macros::tokio_main as main;
46
47/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
49/// Registers a `tokio` test with the `pyo3-asyncio` test harness
50#[cfg(all(feature = "attributes", feature = "testing"))]
51pub use pyo3_async_runtimes_macros::tokio_test as test;
52
53enum Pyo3Runtime {
54    Borrowed(&'static Runtime),
55    Owned(Runtime),
56}
57impl Deref for Pyo3Runtime {
58    type Target = Runtime;
59    fn deref(&self) -> &Self::Target {
60        match self {
61            Self::Borrowed(rt) => rt,
62            Self::Owned(rt) => rt,
63        }
64    }
65}
66
67static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
68static TOKIO_RUNTIME: OnceLock<Pyo3Runtime> = OnceLock::new();
69
70impl generic::JoinError for task::JoinError {
71    fn is_panic(&self) -> bool {
72        task::JoinError::is_panic(self)
73    }
74    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
75        task::JoinError::into_panic(self)
76    }
77}
78
79struct TokioRuntime;
80
81tokio::task_local! {
82    static TASK_LOCALS: OnceCell<TaskLocals>;
83}
84
85impl GenericRuntime for TokioRuntime {
86    type JoinError = task::JoinError;
87    type JoinHandle = task::JoinHandle<()>;
88
89    fn spawn<F>(fut: F) -> Self::JoinHandle
90    where
91        F: Future<Output = ()> + Send + 'static,
92    {
93        get_runtime().spawn(async move {
94            fut.await;
95        })
96    }
97}
98
99impl ContextExt for TokioRuntime {
100    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
101    where
102        F: Future<Output = R> + Send + 'static,
103    {
104        let cell = OnceCell::new();
105        cell.set(locals).unwrap();
106
107        Box::pin(TASK_LOCALS.scope(cell, fut))
108    }
109
110    fn get_task_locals() -> Option<TaskLocals> {
111        TASK_LOCALS
112            .try_with(|c| {
113                c.get()
114                    .map(|locals| Python::attach(|py| locals.clone_ref(py)))
115            })
116            .unwrap_or_default()
117    }
118}
119
120impl SpawnLocalExt for TokioRuntime {
121    fn spawn_local<F>(fut: F) -> Self::JoinHandle
122    where
123        F: Future<Output = ()> + 'static,
124    {
125        tokio::task::spawn_local(fut)
126    }
127}
128
129impl LocalContextExt for TokioRuntime {
130    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
131    where
132        F: Future<Output = R> + 'static,
133    {
134        let cell = OnceCell::new();
135        cell.set(locals).unwrap();
136
137        Box::pin(TASK_LOCALS.scope(cell, fut))
138    }
139}
140
141/// Set the task local event loop for the given future
142pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
143where
144    F: Future<Output = R> + Send + 'static,
145{
146    TokioRuntime::scope(locals, fut).await
147}
148
149/// Set the task local event loop for the given !Send future
150pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
151where
152    F: Future<Output = R> + 'static,
153{
154    TokioRuntime::scope_local(locals, fut).await
155}
156
157/// Get the current event loop from either Python or Rust async task local context
158///
159/// This function first checks if the runtime has a task-local reference to the Python event loop.
160/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
161/// associated with the current OS thread.
162pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
163    generic::get_current_loop::<TokioRuntime>(py)
164}
165
166/// Either copy the task locals from the current task OR get the current running loop and
167/// contextvars from Python.
168pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
169    generic::get_current_locals::<TokioRuntime>(py)
170}
171
172/// Initialize the Tokio runtime with a custom build
173pub fn init(builder: Builder) {
174    *TOKIO_BUILDER.lock().unwrap() = builder
175}
176
177/// Initialize the Tokio runtime with a custom Tokio runtime
178///
179/// Returns Ok(()) if success and Err(()) if it had been inited.
180#[allow(clippy::result_unit_err)]
181pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
182    TOKIO_RUNTIME
183        .set(Pyo3Runtime::Borrowed(runtime))
184        .map_err(|_| ())
185}
186
187/// Get a reference to the current tokio runtime
188pub fn get_runtime<'a>() -> &'a Runtime {
189    TOKIO_RUNTIME.get_or_init(|| {
190        let rt = TOKIO_BUILDER
191            .lock()
192            .unwrap()
193            .build()
194            .expect("Unable to build Tokio runtime");
195        Pyo3Runtime::Owned(rt)
196    })
197}
198
199fn multi_thread() -> Builder {
200    let mut builder = Builder::new_multi_thread();
201    builder.enable_all();
202    builder
203}
204
205/// Run the event loop until the given Future completes
206///
207/// The event loop runs until the given future is complete.
208///
209/// After this function returns, the event loop can be resumed with [`run_until_complete`]
210///
211/// # Arguments
212/// * `event_loop` - The Python event loop that should run the future
213/// * `fut` - The future to drive to completion
214///
215/// # Examples
216///
217/// ```
218/// # use std::time::Duration;
219/// #
220/// # use pyo3::prelude::*;
221/// #
222/// # Python::initialize();
223/// # Python::attach(|py| -> PyResult<()> {
224/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
225/// pyo3_async_runtimes::tokio::run_until_complete(event_loop, async move {
226///     tokio::time::sleep(Duration::from_secs(1)).await;
227///     Ok(())
228/// })?;
229/// # Ok(())
230/// # }).unwrap();
231/// ```
232pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
233where
234    F: Future<Output = PyResult<T>> + Send + 'static,
235    T: Send + Sync + 'static,
236{
237    generic::run_until_complete::<TokioRuntime, _, T>(&event_loop, fut)
238}
239
240/// Run the event loop until the given Future completes
241///
242/// # Arguments
243/// * `py` - The current PyO3 GIL guard
244/// * `fut` - The future to drive to completion
245///
246/// # Examples
247///
248/// ```no_run
249/// # use std::time::Duration;
250/// #
251/// # use pyo3::prelude::*;
252/// #
253/// fn main() {
254///     Python::attach(|py| {
255///         pyo3_async_runtimes::tokio::run(py, async move {
256///             tokio::time::sleep(Duration::from_secs(1)).await;
257///             Ok(())
258///         })
259///         .map_err(|e| {
260///             e.print_and_set_sys_last_vars(py);
261///         })
262///         .unwrap();
263///     })
264/// }
265/// ```
266pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
267where
268    F: Future<Output = PyResult<T>> + Send + 'static,
269    T: Send + Sync + 'static,
270{
271    generic::run::<TokioRuntime, F, T>(py, fut)
272}
273
274/// Convert a Rust Future into a Python awaitable
275///
276/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
277/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
278///
279/// Python `contextvars` are preserved when calling async Python functions within the Rust future
280/// via [`into_future`] (new behaviour in `v0.15`).
281///
282/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
283/// > unfortunately fail to resolve them when called within the Rust future. This is because the
284/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
285/// >
286/// > As a workaround, you can get the `contextvars` from the current task locals using
287/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
288/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
289/// > synchronous function, and restore the previous context when it returns or raises an exception.
290///
291/// # Arguments
292/// * `py` - PyO3 GIL guard
293/// * `locals` - The task locals for the given future
294/// * `fut` - The Rust future to be converted
295///
296/// # Examples
297///
298/// ```
299/// use std::time::Duration;
300///
301/// use pyo3::prelude::*;
302///
303/// /// Awaitable sleep function
304/// #[pyfunction]
305/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
306///     let secs = secs.extract()?;
307///     pyo3_async_runtimes::tokio::future_into_py_with_locals(
308///         py,
309///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
310///         async move {
311///             tokio::time::sleep(Duration::from_secs(secs)).await;
312///             Python::attach(|py| Ok(py.None()))
313///         }
314///     )
315/// }
316/// ```
317pub fn future_into_py_with_locals<F, T>(
318    py: Python,
319    locals: TaskLocals,
320    fut: F,
321) -> PyResult<Bound<PyAny>>
322where
323    F: Future<Output = PyResult<T>> + Send + 'static,
324    T: for<'py> IntoPyObject<'py>,
325{
326    generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
327}
328
329/// Convert a Rust Future into a Python awaitable
330///
331/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
332/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
333///
334/// Python `contextvars` are preserved when calling async Python functions within the Rust future
335/// via [`into_future`] (new behaviour in `v0.15`).
336///
337/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
338/// > unfortunately fail to resolve them when called within the Rust future. This is because the
339/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
340/// >
341/// > As a workaround, you can get the `contextvars` from the current task locals using
342/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
343/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
344/// > synchronous function, and restore the previous context when it returns or raises an exception.
345///
346/// # Arguments
347/// * `py` - The current PyO3 GIL guard
348/// * `fut` - The Rust future to be converted
349///
350/// # Examples
351///
352/// ```
353/// use std::time::Duration;
354///
355/// use pyo3::prelude::*;
356///
357/// /// Awaitable sleep function
358/// #[pyfunction]
359/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
360///     let secs = secs.extract()?;
361///     pyo3_async_runtimes::tokio::future_into_py(py, async move {
362///         tokio::time::sleep(Duration::from_secs(secs)).await;
363///         Ok(())
364///     })
365/// }
366/// ```
367pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
368where
369    F: Future<Output = PyResult<T>> + Send + 'static,
370    T: for<'py> IntoPyObject<'py>,
371{
372    generic::future_into_py::<TokioRuntime, _, T>(py, fut)
373}
374
375/// Convert a `!Send` Rust Future into a Python awaitable
376///
377/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
378/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
379///
380/// Python `contextvars` are preserved when calling async Python functions within the Rust future
381/// via [`into_future`] (new behaviour in `v0.15`).
382///
383/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
384/// > unfortunately fail to resolve them when called within the Rust future. This is because the
385/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
386/// >
387/// > As a workaround, you can get the `contextvars` from the current task locals using
388/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
389/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
390/// > synchronous function, and restore the previous context when it returns or raises an exception.
391///
392/// # Arguments
393/// * `py` - PyO3 GIL guard
394/// * `locals` - The task locals for the given future
395/// * `fut` - The Rust future to be converted
396///
397/// # Examples
398///
399/// ```
400/// use std::{rc::Rc, time::Duration};
401///
402/// use pyo3::prelude::*;
403///
404/// /// Awaitable non-send sleep function
405/// #[pyfunction]
406/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
407///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
408///     let secs = Rc::new(secs);
409///
410///     pyo3_async_runtimes::tokio::local_future_into_py_with_locals(
411///         py,
412///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
413///         async move {
414///             tokio::time::sleep(Duration::from_secs(*secs)).await;
415///             Python::attach(|py| Ok(py.None()))
416///         }
417///     )
418/// }
419///
420/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
421/// #[pyo3_async_runtimes::tokio::main]
422/// async fn main() -> PyResult<()> {
423///     let locals = Python::attach(|py| -> PyResult<_> {
424///         pyo3_async_runtimes::tokio::get_current_locals(py)
425///     })?;
426///
427///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
428///     // we use spawn_blocking in order to use LocalSet::block_on
429///     tokio::task::spawn_blocking(move || {
430///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
431///         // pyo3_async_runtimes::tokio::local_future_into_py will panic.
432///         tokio::task::LocalSet::new().block_on(
433///             pyo3_async_runtimes::tokio::get_runtime(),
434///             pyo3_async_runtimes::tokio::scope_local(locals, async {
435///                 Python::attach(|py| {
436///                     let py_future = sleep_for(py, 1)?;
437///                     pyo3_async_runtimes::tokio::into_future(py_future)
438///                 })?
439///                 .await?;
440///
441///                 Ok(())
442///             })
443///         )
444///     }).await.unwrap()
445/// }
446/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
447/// # fn main() {}
448/// ```
449#[deprecated(
450    since = "0.18.0",
451    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
452)]
453#[allow(deprecated)]
454pub fn local_future_into_py_with_locals<F, T>(
455    py: Python,
456    locals: TaskLocals,
457    fut: F,
458) -> PyResult<Bound<PyAny>>
459where
460    F: Future<Output = PyResult<T>> + 'static,
461    T: for<'py> IntoPyObject<'py>,
462{
463    generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
464}
465
466/// Convert a `!Send` Rust Future into a Python awaitable
467///
468/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
469/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
470///
471/// Python `contextvars` are preserved when calling async Python functions within the Rust future
472/// via [`into_future`] (new behaviour in `v0.15`).
473///
474/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
475/// > unfortunately fail to resolve them when called within the Rust future. This is because the
476/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
477/// >
478/// > As a workaround, you can get the `contextvars` from the current task locals using
479/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
480/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
481/// > synchronous function, and restore the previous context when it returns or raises an exception.
482///
483/// # Arguments
484/// * `py` - The current PyO3 GIL guard
485/// * `fut` - The Rust future to be converted
486///
487/// # Examples
488///
489/// ```
490/// use std::{rc::Rc, time::Duration};
491///
492/// use pyo3::prelude::*;
493///
494/// /// Awaitable non-send sleep function
495/// #[pyfunction]
496/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
497///     // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
498///     let secs = Rc::new(secs);
499///     pyo3_async_runtimes::tokio::local_future_into_py(py, async move {
500///         tokio::time::sleep(Duration::from_secs(*secs)).await;
501///         Ok(())
502///     })
503/// }
504///
505/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
506/// #[pyo3_async_runtimes::tokio::main]
507/// async fn main() -> PyResult<()> {
508///     let locals = Python::attach(|py| {
509///         pyo3_async_runtimes::tokio::get_current_locals(py).unwrap()
510///     });
511///
512///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
513///     // we use spawn_blocking in order to use LocalSet::block_on
514///     tokio::task::spawn_blocking(move || {
515///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
516///         // pyo3_async_runtimes::tokio::local_future_into_py will panic.
517///         tokio::task::LocalSet::new().block_on(
518///             pyo3_async_runtimes::tokio::get_runtime(),
519///             pyo3_async_runtimes::tokio::scope_local(locals, async {
520///                 Python::attach(|py| {
521///                     let py_future = sleep_for(py, 1)?;
522///                     pyo3_async_runtimes::tokio::into_future(py_future)
523///                 })?
524///                 .await?;
525///
526///                 Ok(())
527///             })
528///         )
529///     }).await.unwrap()
530/// }
531/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
532/// # fn main() {}
533/// ```
534#[deprecated(
535    since = "0.18.0",
536    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
537)]
538#[allow(deprecated)]
539pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
540where
541    F: Future<Output = PyResult<T>> + 'static,
542    T: for<'py> IntoPyObject<'py>,
543{
544    generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
545}
546
547/// Convert a Python `awaitable` into a Rust Future
548///
549/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
550/// completion handler sends the result of this Task through a
551/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
552/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
553///
554/// # Arguments
555/// * `awaitable` - The Python `awaitable` to be converted
556///
557/// # Examples
558///
559/// ```
560/// use std::time::Duration;
561/// use std::ffi::CString;
562///
563/// use pyo3::prelude::*;
564///
565/// const PYTHON_CODE: &'static str = r#"
566/// import asyncio
567///
568/// async def py_sleep(duration):
569///     await asyncio.sleep(duration)
570/// "#;
571///
572/// async fn py_sleep(seconds: f32) -> PyResult<()> {
573///     let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
574///         Ok(
575///             PyModule::from_code(
576///                 py,
577///                 &CString::new(PYTHON_CODE).unwrap(),
578///                 &CString::new("test_into_future/test_mod.py").unwrap(),
579///                 &CString::new("test_mod").unwrap(),
580///             )?
581///             .into()
582///         )
583///     })?;
584///
585///     Python::attach(|py| {
586///         pyo3_async_runtimes::tokio::into_future(
587///             test_mod
588///                 .call_method1(py, "py_sleep", (seconds,))?
589///                 .into_bound(py),
590///         )
591///     })?
592///     .await?;
593///     Ok(())
594/// }
595/// ```
596pub fn into_future(
597    awaitable: Bound<PyAny>,
598) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
599    generic::into_future::<TokioRuntime>(awaitable)
600}
601
602/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
603///
604/// **This API is marked as unstable** and is only available when the
605/// `unstable-streams` crate feature is enabled. This comes with no
606/// stability guarantees, and could be changed or removed at any time.
607///
608/// # Arguments
609/// * `locals` - The current task locals
610/// * `gen` - The Python async generator to be converted
611///
612/// # Examples
613/// ```
614/// use pyo3::prelude::*;
615/// use futures::{StreamExt, TryStreamExt};
616/// use std::ffi::CString;
617///
618/// const TEST_MOD: &str = r#"
619/// import asyncio
620///
621/// async def gen():
622///     for i in range(10):
623///         await asyncio.sleep(0.1)
624///         yield i
625/// "#;
626///
627/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
628/// # #[pyo3_async_runtimes::tokio::main]
629/// # async fn main() -> PyResult<()> {
630/// let stream = Python::attach(|py| {
631///     let test_mod = PyModule::from_code(
632///         py,
633///         &CString::new(TEST_MOD).unwrap(),
634///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
635///         &CString::new("test_mod").unwrap(),
636///     )?;
637///
638///     pyo3_async_runtimes::tokio::into_stream_with_locals_v1(
639///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
640///         test_mod.call_method0("gen")?
641///     )
642/// })?;
643///
644/// let vals = stream
645///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
646///     .try_collect::<Vec<i32>>()
647///     .await?;
648///
649/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
650///
651/// Ok(())
652/// # }
653/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
654/// # fn main() {}
655/// ```
656#[cfg(feature = "unstable-streams")]
657pub fn into_stream_with_locals_v1(
658    locals: TaskLocals,
659    gen: Bound<'_, PyAny>,
660) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
661    generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
662}
663
664/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
665///
666/// **This API is marked as unstable** and is only available when the
667/// `unstable-streams` crate feature is enabled. This comes with no
668/// stability guarantees, and could be changed or removed at any time.
669///
670/// # Arguments
671/// * `gen` - The Python async generator to be converted
672///
673/// # Examples
674/// ```
675/// use pyo3::prelude::*;
676/// use futures::{StreamExt, TryStreamExt};
677/// use std::ffi::CString;
678///
679/// const TEST_MOD: &str = r#"
680/// import asyncio
681///
682/// async def gen():
683///     for i in range(10):
684///         await asyncio.sleep(0.1)
685///         yield i
686/// "#;
687///
688/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
689/// # #[pyo3_async_runtimes::tokio::main]
690/// # async fn main() -> PyResult<()> {
691/// let stream = Python::attach(|py| {
692///     let test_mod = PyModule::from_code(
693///         py,
694///         &CString::new(TEST_MOD).unwrap(),
695///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
696///         &CString::new("test_mod").unwrap(),
697///     )?;
698///
699///     pyo3_async_runtimes::tokio::into_stream_v1(test_mod.call_method0("gen")?)
700/// })?;
701///
702/// let vals = stream
703///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
704///     .try_collect::<Vec<i32>>()
705///     .await?;
706///
707/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
708///
709/// Ok(())
710/// # }
711/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
712/// # fn main() {}
713/// ```
714#[cfg(feature = "unstable-streams")]
715pub fn into_stream_v1(
716    gen: Bound<'_, PyAny>,
717) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
718    generic::into_stream_v1::<TokioRuntime>(gen)
719}
720
721/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
722///
723/// **This API is marked as unstable** and is only available when the
724/// `unstable-streams` crate feature is enabled. This comes with no
725/// stability guarantees, and could be changed or removed at any time.
726///
727/// # Arguments
728/// * `locals` - The current task locals
729/// * `gen` - The Python async generator to be converted
730///
731/// # Examples
732/// ```
733/// use pyo3::prelude::*;
734/// use futures::{StreamExt, TryStreamExt};
735/// use std::ffi::CString;
736///
737/// const TEST_MOD: &str = r#"
738/// import asyncio
739///
740/// async def gen():
741///     for i in range(10):
742///         await asyncio.sleep(0.1)
743///         yield i
744/// "#;
745///
746/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
747/// # #[pyo3_async_runtimes::tokio::main]
748/// # async fn main() -> PyResult<()> {
749/// let stream = Python::attach(|py| {
750///     let test_mod = PyModule::from_code(
751///         py,
752///         &CString::new(TEST_MOD).unwrap(),
753///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
754///         &CString::new("test_mod").unwrap(),
755///     )?;
756///
757///     pyo3_async_runtimes::tokio::into_stream_with_locals_v2(
758///         pyo3_async_runtimes::tokio::get_current_locals(py)?,
759///         test_mod.call_method0("gen")?
760///     )
761/// })?;
762///
763/// let vals = stream
764///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
765///     .try_collect::<Vec<i32>>()
766///     .await?;
767///
768/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
769///
770/// Ok(())
771/// # }
772/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
773/// # fn main() {}
774/// ```
775#[cfg(feature = "unstable-streams")]
776pub fn into_stream_with_locals_v2(
777    locals: TaskLocals,
778    gen: Bound<'_, PyAny>,
779) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
780    generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
781}
782
783/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
784///
785/// **This API is marked as unstable** and is only available when the
786/// `unstable-streams` crate feature is enabled. This comes with no
787/// stability guarantees, and could be changed or removed at any time.
788///
789/// # Arguments
790/// * `gen` - The Python async generator to be converted
791///
792/// # Examples
793/// ```
794/// use pyo3::prelude::*;
795/// use futures::{StreamExt, TryStreamExt};
796/// use std::ffi::CString;
797///
798/// const TEST_MOD: &str = r#"
799/// import asyncio
800///
801/// async def gen():
802///     for i in range(10):
803///         await asyncio.sleep(0.1)
804///         yield i
805/// "#;
806///
807/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
808/// # #[pyo3_async_runtimes::tokio::main]
809/// # async fn main() -> PyResult<()> {
810/// let stream = Python::attach(|py| {
811///     let test_mod = PyModule::from_code(
812///         py,
813///         &CString::new(TEST_MOD).unwrap(),
814///         &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
815///         &CString::new("test_mod").unwrap(),
816///     )?;
817///
818///     pyo3_async_runtimes::tokio::into_stream_v2(test_mod.call_method0("gen")?)
819/// })?;
820///
821/// let vals = stream
822///     .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
823///     .try_collect::<Vec<i32>>()
824///     .await?;
825///
826/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
827///
828/// Ok(())
829/// # }
830/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
831/// # fn main() {}
832/// ```
833#[cfg(feature = "unstable-streams")]
834pub fn into_stream_v2(
835    gen: Bound<'_, PyAny>,
836) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
837    generic::into_stream_v2::<TokioRuntime>(gen)
838}