pyo3_async_runtimes/tokio.rs
1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! >are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::cell::OnceCell;
17use std::ops::Deref;
18use std::sync::OnceLock;
19use std::{future::Future, pin::Pin, sync::Mutex};
20
21use ::tokio::{
22 runtime::{Builder, Runtime},
23 task,
24};
25use once_cell::sync::Lazy;
26use pyo3::prelude::*;
27
28use crate::{
29 generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
30 TaskLocals,
31};
32
33/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
34/// re-exports for macros
35#[cfg(feature = "attributes")]
36pub mod re_exports {
37 /// re-export pending to be used in tokio macros without additional dependency
38 pub use futures::future::pending;
39 /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
40 pub use tokio::runtime;
41}
42
43/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
44#[cfg(feature = "attributes")]
45pub use pyo3_async_runtimes_macros::tokio_main as main;
46
47/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
49/// Registers a `tokio` test with the `pyo3-asyncio` test harness
50#[cfg(all(feature = "attributes", feature = "testing"))]
51pub use pyo3_async_runtimes_macros::tokio_test as test;
52
53enum Pyo3Runtime {
54 Borrowed(&'static Runtime),
55 Owned(Runtime),
56}
57impl Deref for Pyo3Runtime {
58 type Target = Runtime;
59 fn deref(&self) -> &Self::Target {
60 match self {
61 Self::Borrowed(rt) => rt,
62 Self::Owned(rt) => rt,
63 }
64 }
65}
66
67static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
68static TOKIO_RUNTIME: OnceLock<Pyo3Runtime> = OnceLock::new();
69
70impl generic::JoinError for task::JoinError {
71 fn is_panic(&self) -> bool {
72 task::JoinError::is_panic(self)
73 }
74 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
75 task::JoinError::into_panic(self)
76 }
77}
78
79struct TokioRuntime;
80
81tokio::task_local! {
82 static TASK_LOCALS: OnceCell<TaskLocals>;
83}
84
85impl GenericRuntime for TokioRuntime {
86 type JoinError = task::JoinError;
87 type JoinHandle = task::JoinHandle<()>;
88
89 fn spawn<F>(fut: F) -> Self::JoinHandle
90 where
91 F: Future<Output = ()> + Send + 'static,
92 {
93 get_runtime().spawn(async move {
94 fut.await;
95 })
96 }
97}
98
99impl ContextExt for TokioRuntime {
100 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
101 where
102 F: Future<Output = R> + Send + 'static,
103 {
104 let cell = OnceCell::new();
105 cell.set(locals).unwrap();
106
107 Box::pin(TASK_LOCALS.scope(cell, fut))
108 }
109
110 fn get_task_locals() -> Option<TaskLocals> {
111 TASK_LOCALS
112 .try_with(|c| {
113 c.get()
114 .map(|locals| Python::attach(|py| locals.clone_ref(py)))
115 })
116 .unwrap_or_default()
117 }
118}
119
120impl SpawnLocalExt for TokioRuntime {
121 fn spawn_local<F>(fut: F) -> Self::JoinHandle
122 where
123 F: Future<Output = ()> + 'static,
124 {
125 tokio::task::spawn_local(fut)
126 }
127}
128
129impl LocalContextExt for TokioRuntime {
130 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
131 where
132 F: Future<Output = R> + 'static,
133 {
134 let cell = OnceCell::new();
135 cell.set(locals).unwrap();
136
137 Box::pin(TASK_LOCALS.scope(cell, fut))
138 }
139}
140
141/// Set the task local event loop for the given future
142pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
143where
144 F: Future<Output = R> + Send + 'static,
145{
146 TokioRuntime::scope(locals, fut).await
147}
148
149/// Set the task local event loop for the given !Send future
150pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
151where
152 F: Future<Output = R> + 'static,
153{
154 TokioRuntime::scope_local(locals, fut).await
155}
156
157/// Get the current event loop from either Python or Rust async task local context
158///
159/// This function first checks if the runtime has a task-local reference to the Python event loop.
160/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
161/// associated with the current OS thread.
162pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
163 generic::get_current_loop::<TokioRuntime>(py)
164}
165
166/// Either copy the task locals from the current task OR get the current running loop and
167/// contextvars from Python.
168pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
169 generic::get_current_locals::<TokioRuntime>(py)
170}
171
172/// Initialize the Tokio runtime with a custom build
173pub fn init(builder: Builder) {
174 *TOKIO_BUILDER.lock().unwrap() = builder
175}
176
177/// Initialize the Tokio runtime with a custom Tokio runtime
178///
179/// Returns Ok(()) if success and Err(()) if it had been inited.
180#[allow(clippy::result_unit_err)]
181pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
182 TOKIO_RUNTIME
183 .set(Pyo3Runtime::Borrowed(runtime))
184 .map_err(|_| ())
185}
186
187/// Get a reference to the current tokio runtime
188pub fn get_runtime<'a>() -> &'a Runtime {
189 TOKIO_RUNTIME.get_or_init(|| {
190 let rt = TOKIO_BUILDER
191 .lock()
192 .unwrap()
193 .build()
194 .expect("Unable to build Tokio runtime");
195 Pyo3Runtime::Owned(rt)
196 })
197}
198
199fn multi_thread() -> Builder {
200 let mut builder = Builder::new_multi_thread();
201 builder.enable_all();
202 builder
203}
204
205/// Run the event loop until the given Future completes
206///
207/// The event loop runs until the given future is complete.
208///
209/// After this function returns, the event loop can be resumed with [`run_until_complete`]
210///
211/// # Arguments
212/// * `event_loop` - The Python event loop that should run the future
213/// * `fut` - The future to drive to completion
214///
215/// # Examples
216///
217/// ```
218/// # use std::time::Duration;
219/// #
220/// # use pyo3::prelude::*;
221/// #
222/// # Python::initialize();
223/// # Python::attach(|py| -> PyResult<()> {
224/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
225/// pyo3_async_runtimes::tokio::run_until_complete(event_loop, async move {
226/// tokio::time::sleep(Duration::from_secs(1)).await;
227/// Ok(())
228/// })?;
229/// # Ok(())
230/// # }).unwrap();
231/// ```
232pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
233where
234 F: Future<Output = PyResult<T>> + Send + 'static,
235 T: Send + Sync + 'static,
236{
237 generic::run_until_complete::<TokioRuntime, _, T>(&event_loop, fut)
238}
239
240/// Run the event loop until the given Future completes
241///
242/// # Arguments
243/// * `py` - The current PyO3 GIL guard
244/// * `fut` - The future to drive to completion
245///
246/// # Examples
247///
248/// ```no_run
249/// # use std::time::Duration;
250/// #
251/// # use pyo3::prelude::*;
252/// #
253/// fn main() {
254/// Python::attach(|py| {
255/// pyo3_async_runtimes::tokio::run(py, async move {
256/// tokio::time::sleep(Duration::from_secs(1)).await;
257/// Ok(())
258/// })
259/// .map_err(|e| {
260/// e.print_and_set_sys_last_vars(py);
261/// })
262/// .unwrap();
263/// })
264/// }
265/// ```
266pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
267where
268 F: Future<Output = PyResult<T>> + Send + 'static,
269 T: Send + Sync + 'static,
270{
271 generic::run::<TokioRuntime, F, T>(py, fut)
272}
273
274/// Convert a Rust Future into a Python awaitable
275///
276/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
277/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
278///
279/// Python `contextvars` are preserved when calling async Python functions within the Rust future
280/// via [`into_future`] (new behaviour in `v0.15`).
281///
282/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
283/// > unfortunately fail to resolve them when called within the Rust future. This is because the
284/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
285/// >
286/// > As a workaround, you can get the `contextvars` from the current task locals using
287/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
288/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
289/// > synchronous function, and restore the previous context when it returns or raises an exception.
290///
291/// # Arguments
292/// * `py` - PyO3 GIL guard
293/// * `locals` - The task locals for the given future
294/// * `fut` - The Rust future to be converted
295///
296/// # Examples
297///
298/// ```
299/// use std::time::Duration;
300///
301/// use pyo3::prelude::*;
302///
303/// /// Awaitable sleep function
304/// #[pyfunction]
305/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
306/// let secs = secs.extract()?;
307/// pyo3_async_runtimes::tokio::future_into_py_with_locals(
308/// py,
309/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
310/// async move {
311/// tokio::time::sleep(Duration::from_secs(secs)).await;
312/// Python::attach(|py| Ok(py.None()))
313/// }
314/// )
315/// }
316/// ```
317pub fn future_into_py_with_locals<F, T>(
318 py: Python,
319 locals: TaskLocals,
320 fut: F,
321) -> PyResult<Bound<PyAny>>
322where
323 F: Future<Output = PyResult<T>> + Send + 'static,
324 T: for<'py> IntoPyObject<'py>,
325{
326 generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
327}
328
329/// Convert a Rust Future into a Python awaitable
330///
331/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
332/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
333///
334/// Python `contextvars` are preserved when calling async Python functions within the Rust future
335/// via [`into_future`] (new behaviour in `v0.15`).
336///
337/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
338/// > unfortunately fail to resolve them when called within the Rust future. This is because the
339/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
340/// >
341/// > As a workaround, you can get the `contextvars` from the current task locals using
342/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
343/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
344/// > synchronous function, and restore the previous context when it returns or raises an exception.
345///
346/// # Arguments
347/// * `py` - The current PyO3 GIL guard
348/// * `fut` - The Rust future to be converted
349///
350/// # Examples
351///
352/// ```
353/// use std::time::Duration;
354///
355/// use pyo3::prelude::*;
356///
357/// /// Awaitable sleep function
358/// #[pyfunction]
359/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
360/// let secs = secs.extract()?;
361/// pyo3_async_runtimes::tokio::future_into_py(py, async move {
362/// tokio::time::sleep(Duration::from_secs(secs)).await;
363/// Ok(())
364/// })
365/// }
366/// ```
367pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
368where
369 F: Future<Output = PyResult<T>> + Send + 'static,
370 T: for<'py> IntoPyObject<'py>,
371{
372 generic::future_into_py::<TokioRuntime, _, T>(py, fut)
373}
374
375/// Convert a `!Send` Rust Future into a Python awaitable
376///
377/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
378/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
379///
380/// Python `contextvars` are preserved when calling async Python functions within the Rust future
381/// via [`into_future`] (new behaviour in `v0.15`).
382///
383/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
384/// > unfortunately fail to resolve them when called within the Rust future. This is because the
385/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
386/// >
387/// > As a workaround, you can get the `contextvars` from the current task locals using
388/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
389/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
390/// > synchronous function, and restore the previous context when it returns or raises an exception.
391///
392/// # Arguments
393/// * `py` - PyO3 GIL guard
394/// * `locals` - The task locals for the given future
395/// * `fut` - The Rust future to be converted
396///
397/// # Examples
398///
399/// ```
400/// use std::{rc::Rc, time::Duration};
401///
402/// use pyo3::prelude::*;
403///
404/// /// Awaitable non-send sleep function
405/// #[pyfunction]
406/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
407/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
408/// let secs = Rc::new(secs);
409///
410/// pyo3_async_runtimes::tokio::local_future_into_py_with_locals(
411/// py,
412/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
413/// async move {
414/// tokio::time::sleep(Duration::from_secs(*secs)).await;
415/// Python::attach(|py| Ok(py.None()))
416/// }
417/// )
418/// }
419///
420/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
421/// #[pyo3_async_runtimes::tokio::main]
422/// async fn main() -> PyResult<()> {
423/// let locals = Python::attach(|py| -> PyResult<_> {
424/// pyo3_async_runtimes::tokio::get_current_locals(py)
425/// })?;
426///
427/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
428/// // we use spawn_blocking in order to use LocalSet::block_on
429/// tokio::task::spawn_blocking(move || {
430/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
431/// // pyo3_async_runtimes::tokio::local_future_into_py will panic.
432/// tokio::task::LocalSet::new().block_on(
433/// pyo3_async_runtimes::tokio::get_runtime(),
434/// pyo3_async_runtimes::tokio::scope_local(locals, async {
435/// Python::attach(|py| {
436/// let py_future = sleep_for(py, 1)?;
437/// pyo3_async_runtimes::tokio::into_future(py_future)
438/// })?
439/// .await?;
440///
441/// Ok(())
442/// })
443/// )
444/// }).await.unwrap()
445/// }
446/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
447/// # fn main() {}
448/// ```
449#[deprecated(
450 since = "0.18.0",
451 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
452)]
453#[allow(deprecated)]
454pub fn local_future_into_py_with_locals<F, T>(
455 py: Python,
456 locals: TaskLocals,
457 fut: F,
458) -> PyResult<Bound<PyAny>>
459where
460 F: Future<Output = PyResult<T>> + 'static,
461 T: for<'py> IntoPyObject<'py>,
462{
463 generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
464}
465
466/// Convert a `!Send` Rust Future into a Python awaitable
467///
468/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
469/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
470///
471/// Python `contextvars` are preserved when calling async Python functions within the Rust future
472/// via [`into_future`] (new behaviour in `v0.15`).
473///
474/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
475/// > unfortunately fail to resolve them when called within the Rust future. This is because the
476/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
477/// >
478/// > As a workaround, you can get the `contextvars` from the current task locals using
479/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
480/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
481/// > synchronous function, and restore the previous context when it returns or raises an exception.
482///
483/// # Arguments
484/// * `py` - The current PyO3 GIL guard
485/// * `fut` - The Rust future to be converted
486///
487/// # Examples
488///
489/// ```
490/// use std::{rc::Rc, time::Duration};
491///
492/// use pyo3::prelude::*;
493///
494/// /// Awaitable non-send sleep function
495/// #[pyfunction]
496/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
497/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
498/// let secs = Rc::new(secs);
499/// pyo3_async_runtimes::tokio::local_future_into_py(py, async move {
500/// tokio::time::sleep(Duration::from_secs(*secs)).await;
501/// Ok(())
502/// })
503/// }
504///
505/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
506/// #[pyo3_async_runtimes::tokio::main]
507/// async fn main() -> PyResult<()> {
508/// let locals = Python::attach(|py| {
509/// pyo3_async_runtimes::tokio::get_current_locals(py).unwrap()
510/// });
511///
512/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
513/// // we use spawn_blocking in order to use LocalSet::block_on
514/// tokio::task::spawn_blocking(move || {
515/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
516/// // pyo3_async_runtimes::tokio::local_future_into_py will panic.
517/// tokio::task::LocalSet::new().block_on(
518/// pyo3_async_runtimes::tokio::get_runtime(),
519/// pyo3_async_runtimes::tokio::scope_local(locals, async {
520/// Python::attach(|py| {
521/// let py_future = sleep_for(py, 1)?;
522/// pyo3_async_runtimes::tokio::into_future(py_future)
523/// })?
524/// .await?;
525///
526/// Ok(())
527/// })
528/// )
529/// }).await.unwrap()
530/// }
531/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
532/// # fn main() {}
533/// ```
534#[deprecated(
535 since = "0.18.0",
536 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
537)]
538#[allow(deprecated)]
539pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
540where
541 F: Future<Output = PyResult<T>> + 'static,
542 T: for<'py> IntoPyObject<'py>,
543{
544 generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
545}
546
547/// Convert a Python `awaitable` into a Rust Future
548///
549/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
550/// completion handler sends the result of this Task through a
551/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
552/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
553///
554/// # Arguments
555/// * `awaitable` - The Python `awaitable` to be converted
556///
557/// # Examples
558///
559/// ```
560/// use std::time::Duration;
561/// use std::ffi::CString;
562///
563/// use pyo3::prelude::*;
564///
565/// const PYTHON_CODE: &'static str = r#"
566/// import asyncio
567///
568/// async def py_sleep(duration):
569/// await asyncio.sleep(duration)
570/// "#;
571///
572/// async fn py_sleep(seconds: f32) -> PyResult<()> {
573/// let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
574/// Ok(
575/// PyModule::from_code(
576/// py,
577/// &CString::new(PYTHON_CODE).unwrap(),
578/// &CString::new("test_into_future/test_mod.py").unwrap(),
579/// &CString::new("test_mod").unwrap(),
580/// )?
581/// .into()
582/// )
583/// })?;
584///
585/// Python::attach(|py| {
586/// pyo3_async_runtimes::tokio::into_future(
587/// test_mod
588/// .call_method1(py, "py_sleep", (seconds,))?
589/// .into_bound(py),
590/// )
591/// })?
592/// .await?;
593/// Ok(())
594/// }
595/// ```
596pub fn into_future(
597 awaitable: Bound<PyAny>,
598) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
599 generic::into_future::<TokioRuntime>(awaitable)
600}
601
602/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
603///
604/// **This API is marked as unstable** and is only available when the
605/// `unstable-streams` crate feature is enabled. This comes with no
606/// stability guarantees, and could be changed or removed at any time.
607///
608/// # Arguments
609/// * `locals` - The current task locals
610/// * `gen` - The Python async generator to be converted
611///
612/// # Examples
613/// ```
614/// use pyo3::prelude::*;
615/// use futures::{StreamExt, TryStreamExt};
616/// use std::ffi::CString;
617///
618/// const TEST_MOD: &str = r#"
619/// import asyncio
620///
621/// async def gen():
622/// for i in range(10):
623/// await asyncio.sleep(0.1)
624/// yield i
625/// "#;
626///
627/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
628/// # #[pyo3_async_runtimes::tokio::main]
629/// # async fn main() -> PyResult<()> {
630/// let stream = Python::attach(|py| {
631/// let test_mod = PyModule::from_code(
632/// py,
633/// &CString::new(TEST_MOD).unwrap(),
634/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
635/// &CString::new("test_mod").unwrap(),
636/// )?;
637///
638/// pyo3_async_runtimes::tokio::into_stream_with_locals_v1(
639/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
640/// test_mod.call_method0("gen")?
641/// )
642/// })?;
643///
644/// let vals = stream
645/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
646/// .try_collect::<Vec<i32>>()
647/// .await?;
648///
649/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
650///
651/// Ok(())
652/// # }
653/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
654/// # fn main() {}
655/// ```
656#[cfg(feature = "unstable-streams")]
657pub fn into_stream_with_locals_v1(
658 locals: TaskLocals,
659 gen: Bound<'_, PyAny>,
660) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
661 generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
662}
663
664/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
665///
666/// **This API is marked as unstable** and is only available when the
667/// `unstable-streams` crate feature is enabled. This comes with no
668/// stability guarantees, and could be changed or removed at any time.
669///
670/// # Arguments
671/// * `gen` - The Python async generator to be converted
672///
673/// # Examples
674/// ```
675/// use pyo3::prelude::*;
676/// use futures::{StreamExt, TryStreamExt};
677/// use std::ffi::CString;
678///
679/// const TEST_MOD: &str = r#"
680/// import asyncio
681///
682/// async def gen():
683/// for i in range(10):
684/// await asyncio.sleep(0.1)
685/// yield i
686/// "#;
687///
688/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
689/// # #[pyo3_async_runtimes::tokio::main]
690/// # async fn main() -> PyResult<()> {
691/// let stream = Python::attach(|py| {
692/// let test_mod = PyModule::from_code(
693/// py,
694/// &CString::new(TEST_MOD).unwrap(),
695/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
696/// &CString::new("test_mod").unwrap(),
697/// )?;
698///
699/// pyo3_async_runtimes::tokio::into_stream_v1(test_mod.call_method0("gen")?)
700/// })?;
701///
702/// let vals = stream
703/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
704/// .try_collect::<Vec<i32>>()
705/// .await?;
706///
707/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
708///
709/// Ok(())
710/// # }
711/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
712/// # fn main() {}
713/// ```
714#[cfg(feature = "unstable-streams")]
715pub fn into_stream_v1(
716 gen: Bound<'_, PyAny>,
717) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
718 generic::into_stream_v1::<TokioRuntime>(gen)
719}
720
721/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
722///
723/// **This API is marked as unstable** and is only available when the
724/// `unstable-streams` crate feature is enabled. This comes with no
725/// stability guarantees, and could be changed or removed at any time.
726///
727/// # Arguments
728/// * `locals` - The current task locals
729/// * `gen` - The Python async generator to be converted
730///
731/// # Examples
732/// ```
733/// use pyo3::prelude::*;
734/// use futures::{StreamExt, TryStreamExt};
735/// use std::ffi::CString;
736///
737/// const TEST_MOD: &str = r#"
738/// import asyncio
739///
740/// async def gen():
741/// for i in range(10):
742/// await asyncio.sleep(0.1)
743/// yield i
744/// "#;
745///
746/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
747/// # #[pyo3_async_runtimes::tokio::main]
748/// # async fn main() -> PyResult<()> {
749/// let stream = Python::attach(|py| {
750/// let test_mod = PyModule::from_code(
751/// py,
752/// &CString::new(TEST_MOD).unwrap(),
753/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
754/// &CString::new("test_mod").unwrap(),
755/// )?;
756///
757/// pyo3_async_runtimes::tokio::into_stream_with_locals_v2(
758/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
759/// test_mod.call_method0("gen")?
760/// )
761/// })?;
762///
763/// let vals = stream
764/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
765/// .try_collect::<Vec<i32>>()
766/// .await?;
767///
768/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
769///
770/// Ok(())
771/// # }
772/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
773/// # fn main() {}
774/// ```
775#[cfg(feature = "unstable-streams")]
776pub fn into_stream_with_locals_v2(
777 locals: TaskLocals,
778 gen: Bound<'_, PyAny>,
779) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
780 generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
781}
782
783/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
784///
785/// **This API is marked as unstable** and is only available when the
786/// `unstable-streams` crate feature is enabled. This comes with no
787/// stability guarantees, and could be changed or removed at any time.
788///
789/// # Arguments
790/// * `gen` - The Python async generator to be converted
791///
792/// # Examples
793/// ```
794/// use pyo3::prelude::*;
795/// use futures::{StreamExt, TryStreamExt};
796/// use std::ffi::CString;
797///
798/// const TEST_MOD: &str = r#"
799/// import asyncio
800///
801/// async def gen():
802/// for i in range(10):
803/// await asyncio.sleep(0.1)
804/// yield i
805/// "#;
806///
807/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
808/// # #[pyo3_async_runtimes::tokio::main]
809/// # async fn main() -> PyResult<()> {
810/// let stream = Python::attach(|py| {
811/// let test_mod = PyModule::from_code(
812/// py,
813/// &CString::new(TEST_MOD).unwrap(),
814/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
815/// &CString::new("test_mod").unwrap(),
816/// )?;
817///
818/// pyo3_async_runtimes::tokio::into_stream_v2(test_mod.call_method0("gen")?)
819/// })?;
820///
821/// let vals = stream
822/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
823/// .try_collect::<Vec<i32>>()
824/// .await?;
825///
826/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
827///
828/// Ok(())
829/// # }
830/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
831/// # fn main() {}
832/// ```
833#[cfg(feature = "unstable-streams")]
834pub fn into_stream_v2(
835 gen: Bound<'_, PyAny>,
836) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
837 generic::into_stream_v2::<TokioRuntime>(gen)
838}