pyo3_asyncio/
tokio.rs

1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//!   class="module-item stab portability"
6//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio]
12//! version = "0.20"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::ops::Deref;
17use std::{future::Future, pin::Pin, sync::Mutex};
18
19use ::tokio::{
20    runtime::{Builder, Runtime},
21    task,
22};
23use once_cell::{
24    sync::{Lazy, OnceCell},
25    unsync::OnceCell as UnsyncOnceCell,
26};
27use pyo3::prelude::*;
28
29use crate::{
30    generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
31    TaskLocals,
32};
33
34/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
35/// re-exports for macros
36#[cfg(feature = "attributes")]
37pub mod re_exports {
38    /// re-export pending to be used in tokio macros without additional dependency
39    pub use futures::future::pending;
40    /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
41    pub use tokio::runtime;
42}
43
44/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
45#[cfg(feature = "attributes")]
46pub use pyo3_asyncio_macros::tokio_main as main;
47
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
49/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
50/// Registers a `tokio` test with the `pyo3-asyncio` test harness
51#[cfg(all(feature = "attributes", feature = "testing"))]
52pub use pyo3_asyncio_macros::tokio_test as test;
53
54enum Pyo3Runtime {
55    Borrowed(&'static Runtime),
56    Owned(Runtime),
57}
58impl Deref for Pyo3Runtime {
59    type Target = Runtime;
60    fn deref(&self) -> &Self::Target {
61        match self {
62            Self::Borrowed(rt) => rt,
63            Self::Owned(rt) => rt,
64        }
65    }
66}
67
68static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
69static TOKIO_RUNTIME: OnceCell<Pyo3Runtime> = OnceCell::new();
70
71impl generic::JoinError for task::JoinError {
72    fn is_panic(&self) -> bool {
73        task::JoinError::is_panic(self)
74    }
75    fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
76        task::JoinError::into_panic(self)
77    }
78}
79
80struct TokioRuntime;
81
82tokio::task_local! {
83    static TASK_LOCALS: UnsyncOnceCell<TaskLocals>;
84}
85
86impl GenericRuntime for TokioRuntime {
87    type JoinError = task::JoinError;
88    type JoinHandle = task::JoinHandle<()>;
89
90    fn spawn<F>(fut: F) -> Self::JoinHandle
91    where
92        F: Future<Output = ()> + Send + 'static,
93    {
94        get_runtime().spawn(async move {
95            fut.await;
96        })
97    }
98}
99
100impl ContextExt for TokioRuntime {
101    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
102    where
103        F: Future<Output = R> + Send + 'static,
104    {
105        let cell = UnsyncOnceCell::new();
106        cell.set(locals).unwrap();
107
108        Box::pin(TASK_LOCALS.scope(cell, fut))
109    }
110
111    fn get_task_locals() -> Option<TaskLocals> {
112        match TASK_LOCALS.try_with(|c| c.get().map(|locals| locals.clone())) {
113            Ok(locals) => locals,
114            Err(_) => None,
115        }
116    }
117}
118
119impl SpawnLocalExt for TokioRuntime {
120    fn spawn_local<F>(fut: F) -> Self::JoinHandle
121    where
122        F: Future<Output = ()> + 'static,
123    {
124        tokio::task::spawn_local(fut)
125    }
126}
127
128impl LocalContextExt for TokioRuntime {
129    fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
130    where
131        F: Future<Output = R> + 'static,
132    {
133        let cell = UnsyncOnceCell::new();
134        cell.set(locals).unwrap();
135
136        Box::pin(TASK_LOCALS.scope(cell, fut))
137    }
138}
139
140/// Set the task local event loop for the given future
141pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
142where
143    F: Future<Output = R> + Send + 'static,
144{
145    TokioRuntime::scope(locals, fut).await
146}
147
148/// Set the task local event loop for the given !Send future
149pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
150where
151    F: Future<Output = R> + 'static,
152{
153    TokioRuntime::scope_local(locals, fut).await
154}
155
156/// Get the current event loop from either Python or Rust async task local context
157///
158/// This function first checks if the runtime has a task-local reference to the Python event loop.
159/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
160/// associated with the current OS thread.
161pub fn get_current_loop(py: Python) -> PyResult<&PyAny> {
162    generic::get_current_loop::<TokioRuntime>(py)
163}
164
165/// Either copy the task locals from the current task OR get the current running loop and
166/// contextvars from Python.
167pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
168    generic::get_current_locals::<TokioRuntime>(py)
169}
170
171/// Initialize the Tokio runtime with a custom build
172pub fn init(builder: Builder) {
173    *TOKIO_BUILDER.lock().unwrap() = builder
174}
175
176/// Initialize the Tokio runtime with a custom Tokio runtime
177///
178/// Returns Ok(()) if success and Err(()) if it had been inited.
179pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
180    TOKIO_RUNTIME
181        .set(Pyo3Runtime::Borrowed(runtime))
182        .map_err(|_| ())
183}
184
185/// Get a reference to the current tokio runtime
186pub fn get_runtime<'a>() -> &'a Runtime {
187    TOKIO_RUNTIME.get_or_init(|| {
188        let rt = TOKIO_BUILDER
189            .lock()
190            .unwrap()
191            .build()
192            .expect("Unable to build Tokio runtime");
193        Pyo3Runtime::Owned(rt)
194    })
195}
196
197fn multi_thread() -> Builder {
198    let mut builder = Builder::new_multi_thread();
199    builder.enable_all();
200    builder
201}
202
203/// Run the event loop until the given Future completes
204///
205/// The event loop runs until the given future is complete.
206///
207/// After this function returns, the event loop can be resumed with [`run_until_complete`]
208///
209/// # Arguments
210/// * `event_loop` - The Python event loop that should run the future
211/// * `fut` - The future to drive to completion
212///
213/// # Examples
214///
215/// ```
216/// # use std::time::Duration;
217/// #
218/// # use pyo3::prelude::*;
219/// #
220/// # pyo3::prepare_freethreaded_python();
221/// # Python::with_gil(|py| -> PyResult<()> {
222/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
223/// pyo3_asyncio::tokio::run_until_complete(event_loop, async move {
224///     tokio::time::sleep(Duration::from_secs(1)).await;
225///     Ok(())
226/// })?;
227/// # Ok(())
228/// # }).unwrap();
229/// ```
230pub fn run_until_complete<F, T>(event_loop: &PyAny, fut: F) -> PyResult<T>
231where
232    F: Future<Output = PyResult<T>> + Send + 'static,
233    T: Send + Sync + 'static,
234{
235    generic::run_until_complete::<TokioRuntime, _, T>(event_loop, fut)
236}
237
238/// Run the event loop until the given Future completes
239///
240/// # Arguments
241/// * `py` - The current PyO3 GIL guard
242/// * `fut` - The future to drive to completion
243///
244/// # Examples
245///
246/// ```no_run
247/// # use std::time::Duration;
248/// #
249/// # use pyo3::prelude::*;
250/// #
251/// fn main() {
252///     Python::with_gil(|py| {
253///         pyo3_asyncio::tokio::run(py, async move {
254///             tokio::time::sleep(Duration::from_secs(1)).await;
255///             Ok(())
256///         })
257///         .map_err(|e| {
258///             e.print_and_set_sys_last_vars(py);
259///         })
260///         .unwrap();
261///     })
262/// }
263/// ```
264pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
265where
266    F: Future<Output = PyResult<T>> + Send + 'static,
267    T: Send + Sync + 'static,
268{
269    generic::run::<TokioRuntime, F, T>(py, fut)
270}
271
272/// Convert a Rust Future into a Python awaitable
273///
274/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
275/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
276///
277/// Python `contextvars` are preserved when calling async Python functions within the Rust future
278/// via [`into_future`] (new behaviour in `v0.15`).
279///
280/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
281/// unfortunately fail to resolve them when called within the Rust future. This is because the
282/// function is being called from a Rust thread, not inside an actual Python coroutine context.
283/// >
284/// > As a workaround, you can get the `contextvars` from the current task locals using
285/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
286/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
287/// synchronous function, and restore the previous context when it returns or raises an exception.
288///
289/// # Arguments
290/// * `py` - PyO3 GIL guard
291/// * `locals` - The task locals for the given future
292/// * `fut` - The Rust future to be converted
293///
294/// # Examples
295///
296/// ```
297/// use std::time::Duration;
298///
299/// use pyo3::prelude::*;
300///
301/// /// Awaitable sleep function
302/// #[pyfunction]
303/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
304///     let secs = secs.extract()?;
305///     pyo3_asyncio::tokio::future_into_py_with_locals(
306///         py,
307///         pyo3_asyncio::tokio::get_current_locals(py)?,
308///         async move {
309///             tokio::time::sleep(Duration::from_secs(secs)).await;
310///             Python::with_gil(|py| Ok(py.None()))
311///         }
312///     )
313/// }
314/// ```
315pub fn future_into_py_with_locals<F, T>(py: Python, locals: TaskLocals, fut: F) -> PyResult<&PyAny>
316where
317    F: Future<Output = PyResult<T>> + Send + 'static,
318    T: IntoPy<PyObject>,
319{
320    generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
321}
322
323/// Convert a Rust Future into a Python awaitable
324///
325/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
326/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
327///
328/// Python `contextvars` are preserved when calling async Python functions within the Rust future
329/// via [`into_future`] (new behaviour in `v0.15`).
330///
331/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
332/// unfortunately fail to resolve them when called within the Rust future. This is because the
333/// function is being called from a Rust thread, not inside an actual Python coroutine context.
334/// >
335/// > As a workaround, you can get the `contextvars` from the current task locals using
336/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
337/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
338/// synchronous function, and restore the previous context when it returns or raises an exception.
339///
340/// # Arguments
341/// * `py` - The current PyO3 GIL guard
342/// * `fut` - The Rust future to be converted
343///
344/// # Examples
345///
346/// ```
347/// use std::time::Duration;
348///
349/// use pyo3::prelude::*;
350///
351/// /// Awaitable sleep function
352/// #[pyfunction]
353/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
354///     let secs = secs.extract()?;
355///     pyo3_asyncio::tokio::future_into_py(py, async move {
356///         tokio::time::sleep(Duration::from_secs(secs)).await;
357///         Ok(())
358///     })
359/// }
360/// ```
361pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
362where
363    F: Future<Output = PyResult<T>> + Send + 'static,
364    T: IntoPy<PyObject>,
365{
366    generic::future_into_py::<TokioRuntime, _, T>(py, fut)
367}
368
369/// Convert a `!Send` Rust Future into a Python awaitable
370///
371/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
372/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
373///
374/// Python `contextvars` are preserved when calling async Python functions within the Rust future
375/// via [`into_future`] (new behaviour in `v0.15`).
376///
377/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
378/// unfortunately fail to resolve them when called within the Rust future. This is because the
379/// function is being called from a Rust thread, not inside an actual Python coroutine context.
380/// >
381/// > As a workaround, you can get the `contextvars` from the current task locals using
382/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
383/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
384/// synchronous function, and restore the previous context when it returns or raises an exception.
385///
386/// # Arguments
387/// * `py` - PyO3 GIL guard
388/// * `locals` - The task locals for the given future
389/// * `fut` - The Rust future to be converted
390///
391/// # Examples
392///
393/// ```
394/// use std::{rc::Rc, time::Duration};
395///
396/// use pyo3::prelude::*;
397///
398/// /// Awaitable non-send sleep function
399/// #[pyfunction]
400/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
401///     // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py
402///     let secs = Rc::new(secs);
403///
404///     pyo3_asyncio::tokio::local_future_into_py_with_locals(
405///         py,
406///         pyo3_asyncio::tokio::get_current_locals(py)?,
407///         async move {
408///             tokio::time::sleep(Duration::from_secs(*secs)).await;
409///             Python::with_gil(|py| Ok(py.None()))
410///         }
411///     )
412/// }
413///
414/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
415/// #[pyo3_asyncio::tokio::main]
416/// async fn main() -> PyResult<()> {
417///     let locals = Python::with_gil(|py| -> PyResult<_> {
418///         pyo3_asyncio::tokio::get_current_locals(py)
419///     })?;
420///
421///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
422///     // we use spawn_blocking in order to use LocalSet::block_on
423///     tokio::task::spawn_blocking(move || {
424///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
425///         // pyo3_asyncio::tokio::local_future_into_py will panic.
426///         tokio::task::LocalSet::new().block_on(
427///             pyo3_asyncio::tokio::get_runtime(),
428///             pyo3_asyncio::tokio::scope_local(locals, async {
429///                 Python::with_gil(|py| {
430///                     let py_future = sleep_for(py, 1)?;
431///                     pyo3_asyncio::tokio::into_future(py_future)
432///                 })?
433///                 .await?;
434///
435///                 Ok(())
436///             })
437///         )
438///     }).await.unwrap()
439/// }
440/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
441/// # fn main() {}
442/// ```
443#[deprecated(
444    since = "0.18.0",
445    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
446)]
447#[allow(deprecated)]
448pub fn local_future_into_py_with_locals<F, T>(
449    py: Python,
450    locals: TaskLocals,
451    fut: F,
452) -> PyResult<&PyAny>
453where
454    F: Future<Output = PyResult<T>> + 'static,
455    T: IntoPy<PyObject>,
456{
457    generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
458}
459
460/// Convert a `!Send` Rust Future into a Python awaitable
461///
462/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
463/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
464///
465/// Python `contextvars` are preserved when calling async Python functions within the Rust future
466/// via [`into_future`] (new behaviour in `v0.15`).
467///
468/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
469/// unfortunately fail to resolve them when called within the Rust future. This is because the
470/// function is being called from a Rust thread, not inside an actual Python coroutine context.
471/// >
472/// > As a workaround, you can get the `contextvars` from the current task locals using
473/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
474/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
475/// synchronous function, and restore the previous context when it returns or raises an exception.
476///
477/// # Arguments
478/// * `py` - The current PyO3 GIL guard
479/// * `fut` - The Rust future to be converted
480///
481/// # Examples
482///
483/// ```
484/// use std::{rc::Rc, time::Duration};
485///
486/// use pyo3::prelude::*;
487///
488/// /// Awaitable non-send sleep function
489/// #[pyfunction]
490/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
491///     // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py
492///     let secs = Rc::new(secs);
493///     pyo3_asyncio::tokio::local_future_into_py(py, async move {
494///         tokio::time::sleep(Duration::from_secs(*secs)).await;
495///         Ok(())
496///     })
497/// }
498///
499/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
500/// #[pyo3_asyncio::tokio::main]
501/// async fn main() -> PyResult<()> {
502///     let locals = Python::with_gil(|py| {
503///         pyo3_asyncio::tokio::get_current_locals(py).unwrap()
504///     });
505///
506///     // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
507///     // we use spawn_blocking in order to use LocalSet::block_on
508///     tokio::task::spawn_blocking(move || {
509///         // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
510///         // pyo3_asyncio::tokio::local_future_into_py will panic.
511///         tokio::task::LocalSet::new().block_on(
512///             pyo3_asyncio::tokio::get_runtime(),
513///             pyo3_asyncio::tokio::scope_local(locals, async {
514///                 Python::with_gil(|py| {
515///                     let py_future = sleep_for(py, 1)?;
516///                     pyo3_asyncio::tokio::into_future(py_future)
517///                 })?
518///                 .await?;
519///
520///                 Ok(())
521///             })
522///         )
523///     }).await.unwrap()
524/// }
525/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
526/// # fn main() {}
527/// ```
528#[deprecated(
529    since = "0.18.0",
530    note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
531)]
532#[allow(deprecated)]
533pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
534where
535    F: Future<Output = PyResult<T>> + 'static,
536    T: IntoPy<PyObject>,
537{
538    generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
539}
540
541/// Convert a Python `awaitable` into a Rust Future
542///
543/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
544/// completion handler sends the result of this Task through a
545/// `futures::channel::oneshot::Sender<PyResult<PyObject>>` and the future returned by this function
546/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<PyObject>>`.
547///
548/// # Arguments
549/// * `awaitable` - The Python `awaitable` to be converted
550///
551/// # Examples
552///
553/// ```
554/// use std::time::Duration;
555///
556/// use pyo3::prelude::*;
557///
558/// const PYTHON_CODE: &'static str = r#"
559/// import asyncio
560///
561/// async def py_sleep(duration):
562///     await asyncio.sleep(duration)
563/// "#;
564///
565/// async fn py_sleep(seconds: f32) -> PyResult<()> {
566///     let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
567///         Ok(
568///             PyModule::from_code(
569///                 py,
570///                 PYTHON_CODE,
571///                 "test_into_future/test_mod.py",
572///                 "test_mod"
573///             )?
574///             .into()
575///         )
576///     })?;
577///
578///     Python::with_gil(|py| {
579///         pyo3_asyncio::tokio::into_future(
580///             test_mod
581///                 .call_method1(py, "py_sleep", (seconds.into_py(py),))?
582///                 .as_ref(py),
583///         )
584///     })?
585///     .await?;
586///     Ok(())
587/// }
588/// ```
589pub fn into_future(awaitable: &PyAny) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
590    generic::into_future::<TokioRuntime>(awaitable)
591}
592
593/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
594///
595/// **This API is marked as unstable** and is only available when the
596/// `unstable-streams` crate feature is enabled. This comes with no
597/// stability guarantees, and could be changed or removed at any time.
598///
599/// # Arguments
600/// * `locals` - The current task locals
601/// * `gen` - The Python async generator to be converted
602///
603/// # Examples
604/// ```
605/// use pyo3::prelude::*;
606/// use futures::{StreamExt, TryStreamExt};
607///
608/// const TEST_MOD: &str = r#"
609/// import asyncio
610///
611/// async def gen():
612///     for i in range(10):
613///         await asyncio.sleep(0.1)
614///         yield i
615/// "#;
616///
617/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
618/// # #[pyo3_asyncio::tokio::main]
619/// # async fn main() -> PyResult<()> {
620/// let stream = Python::with_gil(|py| {
621///     let test_mod = PyModule::from_code(
622///         py,
623///         TEST_MOD,
624///         "test_rust_coroutine/test_mod.py",
625///         "test_mod",
626///     )?;
627///
628///     pyo3_asyncio::tokio::into_stream_with_locals_v1(
629///         pyo3_asyncio::tokio::get_current_locals(py)?,
630///         test_mod.call_method0("gen")?
631///     )
632/// })?;
633///
634/// let vals = stream
635///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
636///     .try_collect::<Vec<i32>>()
637///     .await?;
638///
639/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
640///
641/// Ok(())
642/// # }
643/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
644/// # fn main() {}
645/// ```
646#[cfg(feature = "unstable-streams")]
647pub fn into_stream_with_locals_v1<'p>(
648    locals: TaskLocals,
649    gen: &'p PyAny,
650) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
651    generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
652}
653
654/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
655///
656/// **This API is marked as unstable** and is only available when the
657/// `unstable-streams` crate feature is enabled. This comes with no
658/// stability guarantees, and could be changed or removed at any time.
659///
660/// # Arguments
661/// * `gen` - The Python async generator to be converted
662///
663/// # Examples
664/// ```
665/// use pyo3::prelude::*;
666/// use futures::{StreamExt, TryStreamExt};
667///
668/// const TEST_MOD: &str = r#"
669/// import asyncio
670///
671/// async def gen():
672///     for i in range(10):
673///         await asyncio.sleep(0.1)
674///         yield i
675/// "#;
676///
677/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
678/// # #[pyo3_asyncio::tokio::main]
679/// # async fn main() -> PyResult<()> {
680/// let stream = Python::with_gil(|py| {
681///     let test_mod = PyModule::from_code(
682///         py,
683///         TEST_MOD,
684///         "test_rust_coroutine/test_mod.py",
685///         "test_mod",
686///     )?;
687///
688///     pyo3_asyncio::tokio::into_stream_v1(test_mod.call_method0("gen")?)
689/// })?;
690///
691/// let vals = stream
692///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
693///     .try_collect::<Vec<i32>>()
694///     .await?;
695///
696/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
697///
698/// Ok(())
699/// # }
700/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
701/// # fn main() {}
702/// ```
703#[cfg(feature = "unstable-streams")]
704pub fn into_stream_v1<'p>(
705    gen: &'p PyAny,
706) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
707    generic::into_stream_v1::<TokioRuntime>(gen)
708}
709
710/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
711///
712/// **This API is marked as unstable** and is only available when the
713/// `unstable-streams` crate feature is enabled. This comes with no
714/// stability guarantees, and could be changed or removed at any time.
715///
716/// # Arguments
717/// * `locals` - The current task locals
718/// * `gen` - The Python async generator to be converted
719///
720/// # Examples
721/// ```
722/// use pyo3::prelude::*;
723/// use futures::{StreamExt, TryStreamExt};
724///
725/// const TEST_MOD: &str = r#"
726/// import asyncio
727///
728/// async def gen():
729///     for i in range(10):
730///         await asyncio.sleep(0.1)
731///         yield i
732/// "#;
733///
734/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
735/// # #[pyo3_asyncio::tokio::main]
736/// # async fn main() -> PyResult<()> {
737/// let stream = Python::with_gil(|py| {
738///     let test_mod = PyModule::from_code(
739///         py,
740///         TEST_MOD,
741///         "test_rust_coroutine/test_mod.py",
742///         "test_mod",
743///     )?;
744///
745///     pyo3_asyncio::tokio::into_stream_with_locals_v2(
746///         pyo3_asyncio::tokio::get_current_locals(py)?,
747///         test_mod.call_method0("gen")?
748///     )
749/// })?;
750///
751/// let vals = stream
752///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
753///     .try_collect::<Vec<i32>>()
754///     .await?;
755///
756/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
757///
758/// Ok(())
759/// # }
760/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
761/// # fn main() {}
762/// ```
763#[cfg(feature = "unstable-streams")]
764pub fn into_stream_with_locals_v2<'p>(
765    locals: TaskLocals,
766    gen: &'p PyAny,
767) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
768    generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
769}
770
771/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
772///
773/// **This API is marked as unstable** and is only available when the
774/// `unstable-streams` crate feature is enabled. This comes with no
775/// stability guarantees, and could be changed or removed at any time.
776///
777/// # Arguments
778/// * `gen` - The Python async generator to be converted
779///
780/// # Examples
781/// ```
782/// use pyo3::prelude::*;
783/// use futures::{StreamExt, TryStreamExt};
784///
785/// const TEST_MOD: &str = r#"
786/// import asyncio
787///
788/// async def gen():
789///     for i in range(10):
790///         await asyncio.sleep(0.1)
791///         yield i
792/// "#;
793///
794/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
795/// # #[pyo3_asyncio::tokio::main]
796/// # async fn main() -> PyResult<()> {
797/// let stream = Python::with_gil(|py| {
798///     let test_mod = PyModule::from_code(
799///         py,
800///         TEST_MOD,
801///         "test_rust_coroutine/test_mod.py",
802///         "test_mod",
803///     )?;
804///
805///     pyo3_asyncio::tokio::into_stream_v2(test_mod.call_method0("gen")?)
806/// })?;
807///
808/// let vals = stream
809///     .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
810///     .try_collect::<Vec<i32>>()
811///     .await?;
812///
813/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
814///
815/// Ok(())
816/// # }
817/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
818/// # fn main() {}
819/// ```
820#[cfg(feature = "unstable-streams")]
821pub fn into_stream_v2<'p>(
822    gen: &'p PyAny,
823) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
824    generic::into_stream_v2::<TokioRuntime>(gen)
825}