pyo3_asyncio/tokio.rs
1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio]
12//! version = "0.20"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::ops::Deref;
17use std::{future::Future, pin::Pin, sync::Mutex};
18
19use ::tokio::{
20 runtime::{Builder, Runtime},
21 task,
22};
23use once_cell::{
24 sync::{Lazy, OnceCell},
25 unsync::OnceCell as UnsyncOnceCell,
26};
27use pyo3::prelude::*;
28
29use crate::{
30 generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
31 TaskLocals,
32};
33
34/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
35/// re-exports for macros
36#[cfg(feature = "attributes")]
37pub mod re_exports {
38 /// re-export pending to be used in tokio macros without additional dependency
39 pub use futures::future::pending;
40 /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
41 pub use tokio::runtime;
42}
43
44/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
45#[cfg(feature = "attributes")]
46pub use pyo3_asyncio_macros::tokio_main as main;
47
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
49/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
50/// Registers a `tokio` test with the `pyo3-asyncio` test harness
51#[cfg(all(feature = "attributes", feature = "testing"))]
52pub use pyo3_asyncio_macros::tokio_test as test;
53
54enum Pyo3Runtime {
55 Borrowed(&'static Runtime),
56 Owned(Runtime),
57}
58impl Deref for Pyo3Runtime {
59 type Target = Runtime;
60 fn deref(&self) -> &Self::Target {
61 match self {
62 Self::Borrowed(rt) => rt,
63 Self::Owned(rt) => rt,
64 }
65 }
66}
67
68static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
69static TOKIO_RUNTIME: OnceCell<Pyo3Runtime> = OnceCell::new();
70
71impl generic::JoinError for task::JoinError {
72 fn is_panic(&self) -> bool {
73 task::JoinError::is_panic(self)
74 }
75 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
76 task::JoinError::into_panic(self)
77 }
78}
79
80struct TokioRuntime;
81
82tokio::task_local! {
83 static TASK_LOCALS: UnsyncOnceCell<TaskLocals>;
84}
85
86impl GenericRuntime for TokioRuntime {
87 type JoinError = task::JoinError;
88 type JoinHandle = task::JoinHandle<()>;
89
90 fn spawn<F>(fut: F) -> Self::JoinHandle
91 where
92 F: Future<Output = ()> + Send + 'static,
93 {
94 get_runtime().spawn(async move {
95 fut.await;
96 })
97 }
98}
99
100impl ContextExt for TokioRuntime {
101 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
102 where
103 F: Future<Output = R> + Send + 'static,
104 {
105 let cell = UnsyncOnceCell::new();
106 cell.set(locals).unwrap();
107
108 Box::pin(TASK_LOCALS.scope(cell, fut))
109 }
110
111 fn get_task_locals() -> Option<TaskLocals> {
112 match TASK_LOCALS.try_with(|c| c.get().map(|locals| locals.clone())) {
113 Ok(locals) => locals,
114 Err(_) => None,
115 }
116 }
117}
118
119impl SpawnLocalExt for TokioRuntime {
120 fn spawn_local<F>(fut: F) -> Self::JoinHandle
121 where
122 F: Future<Output = ()> + 'static,
123 {
124 tokio::task::spawn_local(fut)
125 }
126}
127
128impl LocalContextExt for TokioRuntime {
129 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
130 where
131 F: Future<Output = R> + 'static,
132 {
133 let cell = UnsyncOnceCell::new();
134 cell.set(locals).unwrap();
135
136 Box::pin(TASK_LOCALS.scope(cell, fut))
137 }
138}
139
140/// Set the task local event loop for the given future
141pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
142where
143 F: Future<Output = R> + Send + 'static,
144{
145 TokioRuntime::scope(locals, fut).await
146}
147
148/// Set the task local event loop for the given !Send future
149pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
150where
151 F: Future<Output = R> + 'static,
152{
153 TokioRuntime::scope_local(locals, fut).await
154}
155
156/// Get the current event loop from either Python or Rust async task local context
157///
158/// This function first checks if the runtime has a task-local reference to the Python event loop.
159/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
160/// associated with the current OS thread.
161pub fn get_current_loop(py: Python) -> PyResult<&PyAny> {
162 generic::get_current_loop::<TokioRuntime>(py)
163}
164
165/// Either copy the task locals from the current task OR get the current running loop and
166/// contextvars from Python.
167pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
168 generic::get_current_locals::<TokioRuntime>(py)
169}
170
171/// Initialize the Tokio runtime with a custom build
172pub fn init(builder: Builder) {
173 *TOKIO_BUILDER.lock().unwrap() = builder
174}
175
176/// Initialize the Tokio runtime with a custom Tokio runtime
177///
178/// Returns Ok(()) if success and Err(()) if it had been inited.
179pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
180 TOKIO_RUNTIME
181 .set(Pyo3Runtime::Borrowed(runtime))
182 .map_err(|_| ())
183}
184
185/// Get a reference to the current tokio runtime
186pub fn get_runtime<'a>() -> &'a Runtime {
187 TOKIO_RUNTIME.get_or_init(|| {
188 let rt = TOKIO_BUILDER
189 .lock()
190 .unwrap()
191 .build()
192 .expect("Unable to build Tokio runtime");
193 Pyo3Runtime::Owned(rt)
194 })
195}
196
197fn multi_thread() -> Builder {
198 let mut builder = Builder::new_multi_thread();
199 builder.enable_all();
200 builder
201}
202
203/// Run the event loop until the given Future completes
204///
205/// The event loop runs until the given future is complete.
206///
207/// After this function returns, the event loop can be resumed with [`run_until_complete`]
208///
209/// # Arguments
210/// * `event_loop` - The Python event loop that should run the future
211/// * `fut` - The future to drive to completion
212///
213/// # Examples
214///
215/// ```
216/// # use std::time::Duration;
217/// #
218/// # use pyo3::prelude::*;
219/// #
220/// # pyo3::prepare_freethreaded_python();
221/// # Python::with_gil(|py| -> PyResult<()> {
222/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
223/// pyo3_asyncio::tokio::run_until_complete(event_loop, async move {
224/// tokio::time::sleep(Duration::from_secs(1)).await;
225/// Ok(())
226/// })?;
227/// # Ok(())
228/// # }).unwrap();
229/// ```
230pub fn run_until_complete<F, T>(event_loop: &PyAny, fut: F) -> PyResult<T>
231where
232 F: Future<Output = PyResult<T>> + Send + 'static,
233 T: Send + Sync + 'static,
234{
235 generic::run_until_complete::<TokioRuntime, _, T>(event_loop, fut)
236}
237
238/// Run the event loop until the given Future completes
239///
240/// # Arguments
241/// * `py` - The current PyO3 GIL guard
242/// * `fut` - The future to drive to completion
243///
244/// # Examples
245///
246/// ```no_run
247/// # use std::time::Duration;
248/// #
249/// # use pyo3::prelude::*;
250/// #
251/// fn main() {
252/// Python::with_gil(|py| {
253/// pyo3_asyncio::tokio::run(py, async move {
254/// tokio::time::sleep(Duration::from_secs(1)).await;
255/// Ok(())
256/// })
257/// .map_err(|e| {
258/// e.print_and_set_sys_last_vars(py);
259/// })
260/// .unwrap();
261/// })
262/// }
263/// ```
264pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
265where
266 F: Future<Output = PyResult<T>> + Send + 'static,
267 T: Send + Sync + 'static,
268{
269 generic::run::<TokioRuntime, F, T>(py, fut)
270}
271
272/// Convert a Rust Future into a Python awaitable
273///
274/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
275/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
276///
277/// Python `contextvars` are preserved when calling async Python functions within the Rust future
278/// via [`into_future`] (new behaviour in `v0.15`).
279///
280/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
281/// unfortunately fail to resolve them when called within the Rust future. This is because the
282/// function is being called from a Rust thread, not inside an actual Python coroutine context.
283/// >
284/// > As a workaround, you can get the `contextvars` from the current task locals using
285/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
286/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
287/// synchronous function, and restore the previous context when it returns or raises an exception.
288///
289/// # Arguments
290/// * `py` - PyO3 GIL guard
291/// * `locals` - The task locals for the given future
292/// * `fut` - The Rust future to be converted
293///
294/// # Examples
295///
296/// ```
297/// use std::time::Duration;
298///
299/// use pyo3::prelude::*;
300///
301/// /// Awaitable sleep function
302/// #[pyfunction]
303/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
304/// let secs = secs.extract()?;
305/// pyo3_asyncio::tokio::future_into_py_with_locals(
306/// py,
307/// pyo3_asyncio::tokio::get_current_locals(py)?,
308/// async move {
309/// tokio::time::sleep(Duration::from_secs(secs)).await;
310/// Python::with_gil(|py| Ok(py.None()))
311/// }
312/// )
313/// }
314/// ```
315pub fn future_into_py_with_locals<F, T>(py: Python, locals: TaskLocals, fut: F) -> PyResult<&PyAny>
316where
317 F: Future<Output = PyResult<T>> + Send + 'static,
318 T: IntoPy<PyObject>,
319{
320 generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
321}
322
323/// Convert a Rust Future into a Python awaitable
324///
325/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
326/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
327///
328/// Python `contextvars` are preserved when calling async Python functions within the Rust future
329/// via [`into_future`] (new behaviour in `v0.15`).
330///
331/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
332/// unfortunately fail to resolve them when called within the Rust future. This is because the
333/// function is being called from a Rust thread, not inside an actual Python coroutine context.
334/// >
335/// > As a workaround, you can get the `contextvars` from the current task locals using
336/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
337/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
338/// synchronous function, and restore the previous context when it returns or raises an exception.
339///
340/// # Arguments
341/// * `py` - The current PyO3 GIL guard
342/// * `fut` - The Rust future to be converted
343///
344/// # Examples
345///
346/// ```
347/// use std::time::Duration;
348///
349/// use pyo3::prelude::*;
350///
351/// /// Awaitable sleep function
352/// #[pyfunction]
353/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
354/// let secs = secs.extract()?;
355/// pyo3_asyncio::tokio::future_into_py(py, async move {
356/// tokio::time::sleep(Duration::from_secs(secs)).await;
357/// Ok(())
358/// })
359/// }
360/// ```
361pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
362where
363 F: Future<Output = PyResult<T>> + Send + 'static,
364 T: IntoPy<PyObject>,
365{
366 generic::future_into_py::<TokioRuntime, _, T>(py, fut)
367}
368
369/// Convert a `!Send` Rust Future into a Python awaitable
370///
371/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
372/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
373///
374/// Python `contextvars` are preserved when calling async Python functions within the Rust future
375/// via [`into_future`] (new behaviour in `v0.15`).
376///
377/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
378/// unfortunately fail to resolve them when called within the Rust future. This is because the
379/// function is being called from a Rust thread, not inside an actual Python coroutine context.
380/// >
381/// > As a workaround, you can get the `contextvars` from the current task locals using
382/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
383/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
384/// synchronous function, and restore the previous context when it returns or raises an exception.
385///
386/// # Arguments
387/// * `py` - PyO3 GIL guard
388/// * `locals` - The task locals for the given future
389/// * `fut` - The Rust future to be converted
390///
391/// # Examples
392///
393/// ```
394/// use std::{rc::Rc, time::Duration};
395///
396/// use pyo3::prelude::*;
397///
398/// /// Awaitable non-send sleep function
399/// #[pyfunction]
400/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
401/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py
402/// let secs = Rc::new(secs);
403///
404/// pyo3_asyncio::tokio::local_future_into_py_with_locals(
405/// py,
406/// pyo3_asyncio::tokio::get_current_locals(py)?,
407/// async move {
408/// tokio::time::sleep(Duration::from_secs(*secs)).await;
409/// Python::with_gil(|py| Ok(py.None()))
410/// }
411/// )
412/// }
413///
414/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
415/// #[pyo3_asyncio::tokio::main]
416/// async fn main() -> PyResult<()> {
417/// let locals = Python::with_gil(|py| -> PyResult<_> {
418/// pyo3_asyncio::tokio::get_current_locals(py)
419/// })?;
420///
421/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
422/// // we use spawn_blocking in order to use LocalSet::block_on
423/// tokio::task::spawn_blocking(move || {
424/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
425/// // pyo3_asyncio::tokio::local_future_into_py will panic.
426/// tokio::task::LocalSet::new().block_on(
427/// pyo3_asyncio::tokio::get_runtime(),
428/// pyo3_asyncio::tokio::scope_local(locals, async {
429/// Python::with_gil(|py| {
430/// let py_future = sleep_for(py, 1)?;
431/// pyo3_asyncio::tokio::into_future(py_future)
432/// })?
433/// .await?;
434///
435/// Ok(())
436/// })
437/// )
438/// }).await.unwrap()
439/// }
440/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
441/// # fn main() {}
442/// ```
443#[deprecated(
444 since = "0.18.0",
445 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
446)]
447#[allow(deprecated)]
448pub fn local_future_into_py_with_locals<F, T>(
449 py: Python,
450 locals: TaskLocals,
451 fut: F,
452) -> PyResult<&PyAny>
453where
454 F: Future<Output = PyResult<T>> + 'static,
455 T: IntoPy<PyObject>,
456{
457 generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
458}
459
460/// Convert a `!Send` Rust Future into a Python awaitable
461///
462/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
463/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
464///
465/// Python `contextvars` are preserved when calling async Python functions within the Rust future
466/// via [`into_future`] (new behaviour in `v0.15`).
467///
468/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
469/// unfortunately fail to resolve them when called within the Rust future. This is because the
470/// function is being called from a Rust thread, not inside an actual Python coroutine context.
471/// >
472/// > As a workaround, you can get the `contextvars` from the current task locals using
473/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
474/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
475/// synchronous function, and restore the previous context when it returns or raises an exception.
476///
477/// # Arguments
478/// * `py` - The current PyO3 GIL guard
479/// * `fut` - The Rust future to be converted
480///
481/// # Examples
482///
483/// ```
484/// use std::{rc::Rc, time::Duration};
485///
486/// use pyo3::prelude::*;
487///
488/// /// Awaitable non-send sleep function
489/// #[pyfunction]
490/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
491/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py
492/// let secs = Rc::new(secs);
493/// pyo3_asyncio::tokio::local_future_into_py(py, async move {
494/// tokio::time::sleep(Duration::from_secs(*secs)).await;
495/// Ok(())
496/// })
497/// }
498///
499/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
500/// #[pyo3_asyncio::tokio::main]
501/// async fn main() -> PyResult<()> {
502/// let locals = Python::with_gil(|py| {
503/// pyo3_asyncio::tokio::get_current_locals(py).unwrap()
504/// });
505///
506/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
507/// // we use spawn_blocking in order to use LocalSet::block_on
508/// tokio::task::spawn_blocking(move || {
509/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
510/// // pyo3_asyncio::tokio::local_future_into_py will panic.
511/// tokio::task::LocalSet::new().block_on(
512/// pyo3_asyncio::tokio::get_runtime(),
513/// pyo3_asyncio::tokio::scope_local(locals, async {
514/// Python::with_gil(|py| {
515/// let py_future = sleep_for(py, 1)?;
516/// pyo3_asyncio::tokio::into_future(py_future)
517/// })?
518/// .await?;
519///
520/// Ok(())
521/// })
522/// )
523/// }).await.unwrap()
524/// }
525/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
526/// # fn main() {}
527/// ```
528#[deprecated(
529 since = "0.18.0",
530 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
531)]
532#[allow(deprecated)]
533pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<&PyAny>
534where
535 F: Future<Output = PyResult<T>> + 'static,
536 T: IntoPy<PyObject>,
537{
538 generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
539}
540
541/// Convert a Python `awaitable` into a Rust Future
542///
543/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
544/// completion handler sends the result of this Task through a
545/// `futures::channel::oneshot::Sender<PyResult<PyObject>>` and the future returned by this function
546/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<PyObject>>`.
547///
548/// # Arguments
549/// * `awaitable` - The Python `awaitable` to be converted
550///
551/// # Examples
552///
553/// ```
554/// use std::time::Duration;
555///
556/// use pyo3::prelude::*;
557///
558/// const PYTHON_CODE: &'static str = r#"
559/// import asyncio
560///
561/// async def py_sleep(duration):
562/// await asyncio.sleep(duration)
563/// "#;
564///
565/// async fn py_sleep(seconds: f32) -> PyResult<()> {
566/// let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
567/// Ok(
568/// PyModule::from_code(
569/// py,
570/// PYTHON_CODE,
571/// "test_into_future/test_mod.py",
572/// "test_mod"
573/// )?
574/// .into()
575/// )
576/// })?;
577///
578/// Python::with_gil(|py| {
579/// pyo3_asyncio::tokio::into_future(
580/// test_mod
581/// .call_method1(py, "py_sleep", (seconds.into_py(py),))?
582/// .as_ref(py),
583/// )
584/// })?
585/// .await?;
586/// Ok(())
587/// }
588/// ```
589pub fn into_future(awaitable: &PyAny) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
590 generic::into_future::<TokioRuntime>(awaitable)
591}
592
593/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
594///
595/// **This API is marked as unstable** and is only available when the
596/// `unstable-streams` crate feature is enabled. This comes with no
597/// stability guarantees, and could be changed or removed at any time.
598///
599/// # Arguments
600/// * `locals` - The current task locals
601/// * `gen` - The Python async generator to be converted
602///
603/// # Examples
604/// ```
605/// use pyo3::prelude::*;
606/// use futures::{StreamExt, TryStreamExt};
607///
608/// const TEST_MOD: &str = r#"
609/// import asyncio
610///
611/// async def gen():
612/// for i in range(10):
613/// await asyncio.sleep(0.1)
614/// yield i
615/// "#;
616///
617/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
618/// # #[pyo3_asyncio::tokio::main]
619/// # async fn main() -> PyResult<()> {
620/// let stream = Python::with_gil(|py| {
621/// let test_mod = PyModule::from_code(
622/// py,
623/// TEST_MOD,
624/// "test_rust_coroutine/test_mod.py",
625/// "test_mod",
626/// )?;
627///
628/// pyo3_asyncio::tokio::into_stream_with_locals_v1(
629/// pyo3_asyncio::tokio::get_current_locals(py)?,
630/// test_mod.call_method0("gen")?
631/// )
632/// })?;
633///
634/// let vals = stream
635/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
636/// .try_collect::<Vec<i32>>()
637/// .await?;
638///
639/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
640///
641/// Ok(())
642/// # }
643/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
644/// # fn main() {}
645/// ```
646#[cfg(feature = "unstable-streams")]
647pub fn into_stream_with_locals_v1<'p>(
648 locals: TaskLocals,
649 gen: &'p PyAny,
650) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
651 generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
652}
653
654/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
655///
656/// **This API is marked as unstable** and is only available when the
657/// `unstable-streams` crate feature is enabled. This comes with no
658/// stability guarantees, and could be changed or removed at any time.
659///
660/// # Arguments
661/// * `gen` - The Python async generator to be converted
662///
663/// # Examples
664/// ```
665/// use pyo3::prelude::*;
666/// use futures::{StreamExt, TryStreamExt};
667///
668/// const TEST_MOD: &str = r#"
669/// import asyncio
670///
671/// async def gen():
672/// for i in range(10):
673/// await asyncio.sleep(0.1)
674/// yield i
675/// "#;
676///
677/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
678/// # #[pyo3_asyncio::tokio::main]
679/// # async fn main() -> PyResult<()> {
680/// let stream = Python::with_gil(|py| {
681/// let test_mod = PyModule::from_code(
682/// py,
683/// TEST_MOD,
684/// "test_rust_coroutine/test_mod.py",
685/// "test_mod",
686/// )?;
687///
688/// pyo3_asyncio::tokio::into_stream_v1(test_mod.call_method0("gen")?)
689/// })?;
690///
691/// let vals = stream
692/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
693/// .try_collect::<Vec<i32>>()
694/// .await?;
695///
696/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
697///
698/// Ok(())
699/// # }
700/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
701/// # fn main() {}
702/// ```
703#[cfg(feature = "unstable-streams")]
704pub fn into_stream_v1<'p>(
705 gen: &'p PyAny,
706) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
707 generic::into_stream_v1::<TokioRuntime>(gen)
708}
709
710/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
711///
712/// **This API is marked as unstable** and is only available when the
713/// `unstable-streams` crate feature is enabled. This comes with no
714/// stability guarantees, and could be changed or removed at any time.
715///
716/// # Arguments
717/// * `locals` - The current task locals
718/// * `gen` - The Python async generator to be converted
719///
720/// # Examples
721/// ```
722/// use pyo3::prelude::*;
723/// use futures::{StreamExt, TryStreamExt};
724///
725/// const TEST_MOD: &str = r#"
726/// import asyncio
727///
728/// async def gen():
729/// for i in range(10):
730/// await asyncio.sleep(0.1)
731/// yield i
732/// "#;
733///
734/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
735/// # #[pyo3_asyncio::tokio::main]
736/// # async fn main() -> PyResult<()> {
737/// let stream = Python::with_gil(|py| {
738/// let test_mod = PyModule::from_code(
739/// py,
740/// TEST_MOD,
741/// "test_rust_coroutine/test_mod.py",
742/// "test_mod",
743/// )?;
744///
745/// pyo3_asyncio::tokio::into_stream_with_locals_v2(
746/// pyo3_asyncio::tokio::get_current_locals(py)?,
747/// test_mod.call_method0("gen")?
748/// )
749/// })?;
750///
751/// let vals = stream
752/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
753/// .try_collect::<Vec<i32>>()
754/// .await?;
755///
756/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
757///
758/// Ok(())
759/// # }
760/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
761/// # fn main() {}
762/// ```
763#[cfg(feature = "unstable-streams")]
764pub fn into_stream_with_locals_v2<'p>(
765 locals: TaskLocals,
766 gen: &'p PyAny,
767) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
768 generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
769}
770
771/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
772///
773/// **This API is marked as unstable** and is only available when the
774/// `unstable-streams` crate feature is enabled. This comes with no
775/// stability guarantees, and could be changed or removed at any time.
776///
777/// # Arguments
778/// * `gen` - The Python async generator to be converted
779///
780/// # Examples
781/// ```
782/// use pyo3::prelude::*;
783/// use futures::{StreamExt, TryStreamExt};
784///
785/// const TEST_MOD: &str = r#"
786/// import asyncio
787///
788/// async def gen():
789/// for i in range(10):
790/// await asyncio.sleep(0.1)
791/// yield i
792/// "#;
793///
794/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
795/// # #[pyo3_asyncio::tokio::main]
796/// # async fn main() -> PyResult<()> {
797/// let stream = Python::with_gil(|py| {
798/// let test_mod = PyModule::from_code(
799/// py,
800/// TEST_MOD,
801/// "test_rust_coroutine/test_mod.py",
802/// "test_mod",
803/// )?;
804///
805/// pyo3_asyncio::tokio::into_stream_v2(test_mod.call_method0("gen")?)
806/// })?;
807///
808/// let vals = stream
809/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
810/// .try_collect::<Vec<i32>>()
811/// .await?;
812///
813/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
814///
815/// Ok(())
816/// # }
817/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
818/// # fn main() {}
819/// ```
820#[cfg(feature = "unstable-streams")]
821pub fn into_stream_v2<'p>(
822 gen: &'p PyAny,
823) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
824 generic::into_stream_v2::<TokioRuntime>(gen)
825}