pyo3_async_runtimes/tokio.rs
1//! <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>tokio-runtime</code></span> PyO3 Asyncio functions specific to the tokio runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! >are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::cell::OnceCell;
17use std::ops::Deref;
18use std::sync::OnceLock;
19use std::{future::Future, pin::Pin, sync::Mutex};
20
21use ::tokio::{
22 runtime::{Builder, Runtime},
23 task,
24};
25use once_cell::sync::Lazy;
26use pyo3::prelude::*;
27
28use crate::{
29 generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
30 TaskLocals,
31};
32
33/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
34/// re-exports for macros
35#[cfg(feature = "attributes")]
36pub mod re_exports {
37 /// re-export pending to be used in tokio macros without additional dependency
38 pub use futures::future::pending;
39 /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
40 pub use tokio::runtime;
41}
42
43/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
44#[cfg(feature = "attributes")]
45pub use pyo3_async_runtimes_macros::tokio_main as main;
46
47/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span>
48/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span>
49/// Registers a `tokio` test with the `pyo3-asyncio` test harness
50#[cfg(all(feature = "attributes", feature = "testing"))]
51pub use pyo3_async_runtimes_macros::tokio_test as test;
52
53enum Pyo3Runtime {
54 Borrowed(&'static Runtime),
55 Owned(Runtime),
56}
57impl Deref for Pyo3Runtime {
58 type Target = Runtime;
59 fn deref(&self) -> &Self::Target {
60 match self {
61 Self::Borrowed(rt) => rt,
62 Self::Owned(rt) => rt,
63 }
64 }
65}
66
67static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
68static TOKIO_RUNTIME: OnceLock<Pyo3Runtime> = OnceLock::new();
69
70impl generic::JoinError for task::JoinError {
71 fn is_panic(&self) -> bool {
72 task::JoinError::is_panic(self)
73 }
74 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
75 task::JoinError::into_panic(self)
76 }
77}
78
79struct TokioRuntime;
80
81tokio::task_local! {
82 static TASK_LOCALS: OnceCell<TaskLocals>;
83}
84
85impl GenericRuntime for TokioRuntime {
86 type JoinError = task::JoinError;
87 type JoinHandle = task::JoinHandle<()>;
88
89 fn spawn<F>(fut: F) -> Self::JoinHandle
90 where
91 F: Future<Output = ()> + Send + 'static,
92 {
93 get_runtime().spawn(async move {
94 fut.await;
95 })
96 }
97
98 fn spawn_blocking<F>(f: F) -> Self::JoinHandle
99 where
100 F: FnOnce() + Send + 'static,
101 {
102 get_runtime().spawn_blocking(f)
103 }
104}
105
106impl ContextExt for TokioRuntime {
107 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
108 where
109 F: Future<Output = R> + Send + 'static,
110 {
111 let cell = OnceCell::new();
112 cell.set(locals).unwrap();
113
114 Box::pin(TASK_LOCALS.scope(cell, fut))
115 }
116
117 fn get_task_locals() -> Option<TaskLocals> {
118 TASK_LOCALS
119 .try_with(|c| c.get().map(|locals| locals.clone()))
120 .unwrap_or_default()
121 }
122}
123
124impl SpawnLocalExt for TokioRuntime {
125 fn spawn_local<F>(fut: F) -> Self::JoinHandle
126 where
127 F: Future<Output = ()> + 'static,
128 {
129 tokio::task::spawn_local(fut)
130 }
131}
132
133impl LocalContextExt for TokioRuntime {
134 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
135 where
136 F: Future<Output = R> + 'static,
137 {
138 let cell = OnceCell::new();
139 cell.set(locals).unwrap();
140
141 Box::pin(TASK_LOCALS.scope(cell, fut))
142 }
143}
144
145/// Set the task local event loop for the given future
146pub async fn scope<F, R>(locals: TaskLocals, fut: F) -> R
147where
148 F: Future<Output = R> + Send + 'static,
149{
150 TokioRuntime::scope(locals, fut).await
151}
152
153/// Set the task local event loop for the given !Send future
154pub async fn scope_local<F, R>(locals: TaskLocals, fut: F) -> R
155where
156 F: Future<Output = R> + 'static,
157{
158 TokioRuntime::scope_local(locals, fut).await
159}
160
161/// Get the current event loop from either Python or Rust async task local context
162///
163/// This function first checks if the runtime has a task-local reference to the Python event loop.
164/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop
165/// associated with the current OS thread.
166pub fn get_current_loop(py: Python) -> PyResult<Bound<PyAny>> {
167 generic::get_current_loop::<TokioRuntime>(py)
168}
169
170/// Either copy the task locals from the current task OR get the current running loop and
171/// contextvars from Python.
172pub fn get_current_locals(py: Python) -> PyResult<TaskLocals> {
173 generic::get_current_locals::<TokioRuntime>(py)
174}
175
176/// Initialize the Tokio runtime with a custom build
177pub fn init(builder: Builder) {
178 *TOKIO_BUILDER.lock().unwrap() = builder
179}
180
181/// Initialize the Tokio runtime with a custom Tokio runtime
182///
183/// Returns Ok(()) if success and Err(()) if it had been inited.
184#[allow(clippy::result_unit_err)]
185pub fn init_with_runtime(runtime: &'static Runtime) -> Result<(), ()> {
186 TOKIO_RUNTIME
187 .set(Pyo3Runtime::Borrowed(runtime))
188 .map_err(|_| ())
189}
190
191/// Get a reference to the current tokio runtime
192pub fn get_runtime<'a>() -> &'a Runtime {
193 TOKIO_RUNTIME.get_or_init(|| {
194 let rt = TOKIO_BUILDER
195 .lock()
196 .unwrap()
197 .build()
198 .expect("Unable to build Tokio runtime");
199 Pyo3Runtime::Owned(rt)
200 })
201}
202
203fn multi_thread() -> Builder {
204 let mut builder = Builder::new_multi_thread();
205 builder.enable_all();
206 builder
207}
208
209/// Run the event loop until the given Future completes
210///
211/// The event loop runs until the given future is complete.
212///
213/// After this function returns, the event loop can be resumed with [`run_until_complete`]
214///
215/// # Arguments
216/// * `event_loop` - The Python event loop that should run the future
217/// * `fut` - The future to drive to completion
218///
219/// # Examples
220///
221/// ```
222/// # use std::time::Duration;
223/// #
224/// # use pyo3::prelude::*;
225/// #
226/// # Python::initialize();
227/// # Python::attach(|py| -> PyResult<()> {
228/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
229/// pyo3_async_runtimes::tokio::run_until_complete(event_loop, async move {
230/// tokio::time::sleep(Duration::from_secs(1)).await;
231/// Ok(())
232/// })?;
233/// # Ok(())
234/// # }).unwrap();
235/// ```
236pub fn run_until_complete<F, T>(event_loop: Bound<PyAny>, fut: F) -> PyResult<T>
237where
238 F: Future<Output = PyResult<T>> + Send + 'static,
239 T: Send + Sync + 'static,
240{
241 generic::run_until_complete::<TokioRuntime, _, T>(&event_loop, fut)
242}
243
244/// Run the event loop until the given Future completes
245///
246/// # Arguments
247/// * `py` - The current PyO3 GIL guard
248/// * `fut` - The future to drive to completion
249///
250/// # Examples
251///
252/// ```no_run
253/// # use std::time::Duration;
254/// #
255/// # use pyo3::prelude::*;
256/// #
257/// fn main() {
258/// Python::attach(|py| {
259/// pyo3_async_runtimes::tokio::run(py, async move {
260/// tokio::time::sleep(Duration::from_secs(1)).await;
261/// Ok(())
262/// })
263/// .map_err(|e| {
264/// e.print_and_set_sys_last_vars(py);
265/// })
266/// .unwrap();
267/// })
268/// }
269/// ```
270pub fn run<F, T>(py: Python, fut: F) -> PyResult<T>
271where
272 F: Future<Output = PyResult<T>> + Send + 'static,
273 T: Send + Sync + 'static,
274{
275 generic::run::<TokioRuntime, F, T>(py, fut)
276}
277
278/// Convert a Rust Future into a Python awaitable
279///
280/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
281/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
282///
283/// Python `contextvars` are preserved when calling async Python functions within the Rust future
284/// via [`into_future`] (new behaviour in `v0.15`).
285///
286/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
287/// > unfortunately fail to resolve them when called within the Rust future. This is because the
288/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
289/// >
290/// > As a workaround, you can get the `contextvars` from the current task locals using
291/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
292/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
293/// > synchronous function, and restore the previous context when it returns or raises an exception.
294///
295/// # Arguments
296/// * `py` - PyO3 GIL guard
297/// * `locals` - The task locals for the given future
298/// * `fut` - The Rust future to be converted
299///
300/// # Examples
301///
302/// ```
303/// use std::time::Duration;
304///
305/// use pyo3::prelude::*;
306///
307/// /// Awaitable sleep function
308/// #[pyfunction]
309/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
310/// let secs = secs.extract()?;
311/// pyo3_async_runtimes::tokio::future_into_py_with_locals(
312/// py,
313/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
314/// async move {
315/// tokio::time::sleep(Duration::from_secs(secs)).await;
316/// Python::attach(|py| Ok(py.None()))
317/// }
318/// )
319/// }
320/// ```
321pub fn future_into_py_with_locals<F, T>(
322 py: Python,
323 locals: TaskLocals,
324 fut: F,
325) -> PyResult<Bound<PyAny>>
326where
327 F: Future<Output = PyResult<T>> + Send + 'static,
328 T: for<'py> IntoPyObject<'py> + Send + 'static,
329{
330 generic::future_into_py_with_locals::<TokioRuntime, F, T>(py, locals, fut)
331}
332
333/// Convert a Rust Future into a Python awaitable
334///
335/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
336/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
337///
338/// Python `contextvars` are preserved when calling async Python functions within the Rust future
339/// via [`into_future`] (new behaviour in `v0.15`).
340///
341/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
342/// > unfortunately fail to resolve them when called within the Rust future. This is because the
343/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
344/// >
345/// > As a workaround, you can get the `contextvars` from the current task locals using
346/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
347/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
348/// > synchronous function, and restore the previous context when it returns or raises an exception.
349///
350/// # Arguments
351/// * `py` - The current PyO3 GIL guard
352/// * `fut` - The Rust future to be converted
353///
354/// # Examples
355///
356/// ```
357/// use std::time::Duration;
358///
359/// use pyo3::prelude::*;
360///
361/// /// Awaitable sleep function
362/// #[pyfunction]
363/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
364/// let secs = secs.extract()?;
365/// pyo3_async_runtimes::tokio::future_into_py(py, async move {
366/// tokio::time::sleep(Duration::from_secs(secs)).await;
367/// Ok(())
368/// })
369/// }
370/// ```
371pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
372where
373 F: Future<Output = PyResult<T>> + Send + 'static,
374 T: for<'py> IntoPyObject<'py> + Send + 'static,
375{
376 generic::future_into_py::<TokioRuntime, _, T>(py, fut)
377}
378
379/// Convert a `!Send` Rust Future into a Python awaitable
380///
381/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
382/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
383///
384/// Python `contextvars` are preserved when calling async Python functions within the Rust future
385/// via [`into_future`] (new behaviour in `v0.15`).
386///
387/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
388/// > unfortunately fail to resolve them when called within the Rust future. This is because the
389/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
390/// >
391/// > As a workaround, you can get the `contextvars` from the current task locals using
392/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
393/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
394/// > synchronous function, and restore the previous context when it returns or raises an exception.
395///
396/// # Arguments
397/// * `py` - PyO3 GIL guard
398/// * `locals` - The task locals for the given future
399/// * `fut` - The Rust future to be converted
400///
401/// # Examples
402///
403/// ```
404/// use std::{rc::Rc, time::Duration};
405///
406/// use pyo3::prelude::*;
407///
408/// /// Awaitable non-send sleep function
409/// #[pyfunction]
410/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
411/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
412/// let secs = Rc::new(secs);
413///
414/// pyo3_async_runtimes::tokio::local_future_into_py_with_locals(
415/// py,
416/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
417/// async move {
418/// tokio::time::sleep(Duration::from_secs(*secs)).await;
419/// Python::attach(|py| Ok(py.None()))
420/// }
421/// )
422/// }
423///
424/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
425/// #[pyo3_async_runtimes::tokio::main]
426/// async fn main() -> PyResult<()> {
427/// let locals = Python::attach(|py| -> PyResult<_> {
428/// pyo3_async_runtimes::tokio::get_current_locals(py)
429/// })?;
430///
431/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
432/// // we use spawn_blocking in order to use LocalSet::block_on
433/// tokio::task::spawn_blocking(move || {
434/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
435/// // pyo3_async_runtimes::tokio::local_future_into_py will panic.
436/// tokio::task::LocalSet::new().block_on(
437/// pyo3_async_runtimes::tokio::get_runtime(),
438/// pyo3_async_runtimes::tokio::scope_local(locals, async {
439/// Python::attach(|py| {
440/// let py_future = sleep_for(py, 1)?;
441/// pyo3_async_runtimes::tokio::into_future(py_future)
442/// })?
443/// .await?;
444///
445/// Ok(())
446/// })
447/// )
448/// }).await.unwrap()
449/// }
450/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
451/// # fn main() {}
452/// ```
453#[deprecated(
454 since = "0.18.0",
455 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
456)]
457#[allow(deprecated)]
458pub fn local_future_into_py_with_locals<F, T>(
459 py: Python,
460 locals: TaskLocals,
461 fut: F,
462) -> PyResult<Bound<PyAny>>
463where
464 F: Future<Output = PyResult<T>> + 'static,
465 T: for<'py> IntoPyObject<'py>,
466{
467 generic::local_future_into_py_with_locals::<TokioRuntime, _, T>(py, locals, fut)
468}
469
470/// Convert a `!Send` Rust Future into a Python awaitable
471///
472/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
473/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
474///
475/// Python `contextvars` are preserved when calling async Python functions within the Rust future
476/// via [`into_future`] (new behaviour in `v0.15`).
477///
478/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
479/// > unfortunately fail to resolve them when called within the Rust future. This is because the
480/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
481/// >
482/// > As a workaround, you can get the `contextvars` from the current task locals using
483/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
484/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
485/// > synchronous function, and restore the previous context when it returns or raises an exception.
486///
487/// # Arguments
488/// * `py` - The current PyO3 GIL guard
489/// * `fut` - The Rust future to be converted
490///
491/// # Examples
492///
493/// ```
494/// use std::{rc::Rc, time::Duration};
495///
496/// use pyo3::prelude::*;
497///
498/// /// Awaitable non-send sleep function
499/// #[pyfunction]
500/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
501/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::tokio::future_into_py
502/// let secs = Rc::new(secs);
503/// pyo3_async_runtimes::tokio::local_future_into_py(py, async move {
504/// tokio::time::sleep(Duration::from_secs(*secs)).await;
505/// Ok(())
506/// })
507/// }
508///
509/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))]
510/// #[pyo3_async_runtimes::tokio::main]
511/// async fn main() -> PyResult<()> {
512/// let locals = Python::attach(|py| {
513/// pyo3_async_runtimes::tokio::get_current_locals(py).unwrap()
514/// });
515///
516/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead
517/// // we use spawn_blocking in order to use LocalSet::block_on
518/// tokio::task::spawn_blocking(move || {
519/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to
520/// // pyo3_async_runtimes::tokio::local_future_into_py will panic.
521/// tokio::task::LocalSet::new().block_on(
522/// pyo3_async_runtimes::tokio::get_runtime(),
523/// pyo3_async_runtimes::tokio::scope_local(locals, async {
524/// Python::attach(|py| {
525/// let py_future = sleep_for(py, 1)?;
526/// pyo3_async_runtimes::tokio::into_future(py_future)
527/// })?
528/// .await?;
529///
530/// Ok(())
531/// })
532/// )
533/// }).await.unwrap()
534/// }
535/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))]
536/// # fn main() {}
537/// ```
538#[deprecated(
539 since = "0.18.0",
540 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
541)]
542#[allow(deprecated)]
543pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
544where
545 F: Future<Output = PyResult<T>> + 'static,
546 T: for<'py> IntoPyObject<'py>,
547{
548 generic::local_future_into_py::<TokioRuntime, _, T>(py, fut)
549}
550
551/// Convert a Python `awaitable` into a Rust Future
552///
553/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
554/// completion handler sends the result of this Task through a
555/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
556/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
557///
558/// # Arguments
559/// * `awaitable` - The Python `awaitable` to be converted
560///
561/// # Examples
562///
563/// ```
564/// use std::time::Duration;
565/// use std::ffi::CString;
566///
567/// use pyo3::prelude::*;
568///
569/// const PYTHON_CODE: &'static str = r#"
570/// import asyncio
571///
572/// async def py_sleep(duration):
573/// await asyncio.sleep(duration)
574/// "#;
575///
576/// async fn py_sleep(seconds: f32) -> PyResult<()> {
577/// let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
578/// Ok(
579/// PyModule::from_code(
580/// py,
581/// &CString::new(PYTHON_CODE).unwrap(),
582/// &CString::new("test_into_future/test_mod.py").unwrap(),
583/// &CString::new("test_mod").unwrap(),
584/// )?
585/// .into()
586/// )
587/// })?;
588///
589/// Python::attach(|py| {
590/// pyo3_async_runtimes::tokio::into_future(
591/// test_mod
592/// .call_method1(py, "py_sleep", (seconds,))?
593/// .into_bound(py),
594/// )
595/// })?
596/// .await?;
597/// Ok(())
598/// }
599/// ```
600pub fn into_future(
601 awaitable: Bound<PyAny>,
602) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
603 generic::into_future::<TokioRuntime>(awaitable)
604}
605
606/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
607///
608/// **This API is marked as unstable** and is only available when the
609/// `unstable-streams` crate feature is enabled. This comes with no
610/// stability guarantees, and could be changed or removed at any time.
611///
612/// # Arguments
613/// * `locals` - The current task locals
614/// * `gen` - The Python async generator to be converted
615///
616/// # Examples
617/// ```
618/// use pyo3::prelude::*;
619/// use futures::{StreamExt, TryStreamExt};
620/// use std::ffi::CString;
621///
622/// const TEST_MOD: &str = r#"
623/// import asyncio
624///
625/// async def gen():
626/// for i in range(10):
627/// await asyncio.sleep(0.1)
628/// yield i
629/// "#;
630///
631/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
632/// # #[pyo3_async_runtimes::tokio::main]
633/// # async fn main() -> PyResult<()> {
634/// let stream = Python::attach(|py| {
635/// let test_mod = PyModule::from_code(
636/// py,
637/// &CString::new(TEST_MOD).unwrap(),
638/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
639/// &CString::new("test_mod").unwrap(),
640/// )?;
641///
642/// pyo3_async_runtimes::tokio::into_stream_with_locals_v1(
643/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
644/// test_mod.call_method0("gen")?
645/// )
646/// })?;
647///
648/// let vals = stream
649/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
650/// .try_collect::<Vec<i32>>()
651/// .await?;
652///
653/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
654///
655/// Ok(())
656/// # }
657/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
658/// # fn main() {}
659/// ```
660#[cfg(feature = "unstable-streams")]
661pub fn into_stream_with_locals_v1(
662 locals: TaskLocals,
663 gen: Bound<'_, PyAny>,
664) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
665 generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
666}
667
668/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
669///
670/// **This API is marked as unstable** and is only available when the
671/// `unstable-streams` crate feature is enabled. This comes with no
672/// stability guarantees, and could be changed or removed at any time.
673///
674/// # Arguments
675/// * `gen` - The Python async generator to be converted
676///
677/// # Examples
678/// ```
679/// use pyo3::prelude::*;
680/// use futures::{StreamExt, TryStreamExt};
681/// use std::ffi::CString;
682///
683/// const TEST_MOD: &str = r#"
684/// import asyncio
685///
686/// async def gen():
687/// for i in range(10):
688/// await asyncio.sleep(0.1)
689/// yield i
690/// "#;
691///
692/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
693/// # #[pyo3_async_runtimes::tokio::main]
694/// # async fn main() -> PyResult<()> {
695/// let stream = Python::attach(|py| {
696/// let test_mod = PyModule::from_code(
697/// py,
698/// &CString::new(TEST_MOD).unwrap(),
699/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
700/// &CString::new("test_mod").unwrap(),
701/// )?;
702///
703/// pyo3_async_runtimes::tokio::into_stream_v1(test_mod.call_method0("gen")?)
704/// })?;
705///
706/// let vals = stream
707/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
708/// .try_collect::<Vec<i32>>()
709/// .await?;
710///
711/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
712///
713/// Ok(())
714/// # }
715/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
716/// # fn main() {}
717/// ```
718#[cfg(feature = "unstable-streams")]
719pub fn into_stream_v1(
720 gen: Bound<'_, PyAny>,
721) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
722 generic::into_stream_v1::<TokioRuntime>(gen)
723}
724
725/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
726///
727/// **This API is marked as unstable** and is only available when the
728/// `unstable-streams` crate feature is enabled. This comes with no
729/// stability guarantees, and could be changed or removed at any time.
730///
731/// # Arguments
732/// * `locals` - The current task locals
733/// * `gen` - The Python async generator to be converted
734///
735/// # Examples
736/// ```
737/// use pyo3::prelude::*;
738/// use futures::{StreamExt, TryStreamExt};
739/// use std::ffi::CString;
740///
741/// const TEST_MOD: &str = r#"
742/// import asyncio
743///
744/// async def gen():
745/// for i in range(10):
746/// await asyncio.sleep(0.1)
747/// yield i
748/// "#;
749///
750/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
751/// # #[pyo3_async_runtimes::tokio::main]
752/// # async fn main() -> PyResult<()> {
753/// let stream = Python::attach(|py| {
754/// let test_mod = PyModule::from_code(
755/// py,
756/// &CString::new(TEST_MOD).unwrap(),
757/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
758/// &CString::new("test_mod").unwrap(),
759/// )?;
760///
761/// pyo3_async_runtimes::tokio::into_stream_with_locals_v2(
762/// pyo3_async_runtimes::tokio::get_current_locals(py)?,
763/// test_mod.call_method0("gen")?
764/// )
765/// })?;
766///
767/// let vals = stream
768/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
769/// .try_collect::<Vec<i32>>()
770/// .await?;
771///
772/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
773///
774/// Ok(())
775/// # }
776/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
777/// # fn main() {}
778/// ```
779#[cfg(feature = "unstable-streams")]
780pub fn into_stream_with_locals_v2(
781 locals: TaskLocals,
782 gen: Bound<'_, PyAny>,
783) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
784 generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
785}
786
787/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
788///
789/// **This API is marked as unstable** and is only available when the
790/// `unstable-streams` crate feature is enabled. This comes with no
791/// stability guarantees, and could be changed or removed at any time.
792///
793/// # Arguments
794/// * `gen` - The Python async generator to be converted
795///
796/// # Examples
797/// ```
798/// use pyo3::prelude::*;
799/// use futures::{StreamExt, TryStreamExt};
800/// use std::ffi::CString;
801///
802/// const TEST_MOD: &str = r#"
803/// import asyncio
804///
805/// async def gen():
806/// for i in range(10):
807/// await asyncio.sleep(0.1)
808/// yield i
809/// "#;
810///
811/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))]
812/// # #[pyo3_async_runtimes::tokio::main]
813/// # async fn main() -> PyResult<()> {
814/// let stream = Python::attach(|py| {
815/// let test_mod = PyModule::from_code(
816/// py,
817/// &CString::new(TEST_MOD).unwrap(),
818/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
819/// &CString::new("test_mod").unwrap(),
820/// )?;
821///
822/// pyo3_async_runtimes::tokio::into_stream_v2(test_mod.call_method0("gen")?)
823/// })?;
824///
825/// let vals = stream
826/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
827/// .try_collect::<Vec<i32>>()
828/// .await?;
829///
830/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
831///
832/// Ok(())
833/// # }
834/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))]
835/// # fn main() {}
836/// ```
837#[cfg(feature = "unstable-streams")]
838pub fn into_stream_v2(
839 gen: Bound<'_, PyAny>,
840) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
841 generic::into_stream_v2::<TokioRuntime>(gen)
842}