pyo3_asyncio/generic.rs
1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio]
12//! version = "0.20"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17 future::Future,
18 marker::PhantomData,
19 pin::Pin,
20 sync::{Arc, Mutex},
21 task::{Context, Poll},
22};
23
24use futures::{
25 channel::{mpsc, oneshot},
26 SinkExt,
27};
28use once_cell::sync::OnceCell;
29use pin_project_lite::pin_project;
30use pyo3::prelude::*;
31
32use crate::{
33 asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
34 get_running_loop, into_future_with_locals, TaskLocals,
35};
36
37/// Generic utilities for a JoinError
38pub trait JoinError {
39 /// Check if the spawned task exited because of a panic
40 fn is_panic(&self) -> bool;
41 /// Get the panic object associated with the error. Panics if `is_panic` is not true.
42 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
43}
44
45/// Generic Rust async/await runtime
46pub trait Runtime: Send + 'static {
47 /// The error returned by a JoinHandle after being awaited
48 type JoinError: JoinError + Send;
49 /// A future that completes with the result of the spawned task
50 type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
51
52 /// Spawn a future onto this runtime's event loop
53 fn spawn<F>(fut: F) -> Self::JoinHandle
54 where
55 F: Future<Output = ()> + Send + 'static;
56}
57
58/// Extension trait for async/await runtimes that support spawning local tasks
59pub trait SpawnLocalExt: Runtime {
60 /// Spawn a !Send future onto this runtime's event loop
61 fn spawn_local<F>(fut: F) -> Self::JoinHandle
62 where
63 F: Future<Output = ()> + 'static;
64}
65
66/// Exposes the utilities necessary for using task-local data in the Runtime
67pub trait ContextExt: Runtime {
68 /// Set the task locals for the given future
69 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
70 where
71 F: Future<Output = R> + Send + 'static;
72
73 /// Get the task locals for the current task
74 fn get_task_locals() -> Option<TaskLocals>;
75}
76
77/// Adds the ability to scope task-local data for !Send futures
78pub trait LocalContextExt: Runtime {
79 /// Set the task locals for the given !Send future
80 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
81 where
82 F: Future<Output = R> + 'static;
83}
84
85/// Get the current event loop from either Python or Rust async task local context
86///
87/// This function first checks if the runtime has a task-local reference to the Python event loop.
88/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
89/// with the current OS thread.
90pub fn get_current_loop<R>(py: Python) -> PyResult<&PyAny>
91where
92 R: ContextExt,
93{
94 if let Some(locals) = R::get_task_locals() {
95 Ok(locals.event_loop.into_ref(py))
96 } else {
97 get_running_loop(py)
98 }
99}
100
101/// Either copy the task locals from the current task OR get the current running loop and
102/// contextvars from Python.
103pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
104where
105 R: ContextExt,
106{
107 if let Some(locals) = R::get_task_locals() {
108 Ok(locals)
109 } else {
110 Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
111 }
112}
113
114/// Run the event loop until the given Future completes
115///
116/// After this function returns, the event loop can be resumed with [`run_until_complete`]
117///
118/// # Arguments
119/// * `event_loop` - The Python event loop that should run the future
120/// * `fut` - The future to drive to completion
121///
122/// # Examples
123///
124/// ```no_run
125/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
126/// #
127/// # use pyo3_asyncio::{
128/// # TaskLocals,
129/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
130/// # };
131/// #
132/// # struct MyCustomJoinError;
133/// #
134/// # impl JoinError for MyCustomJoinError {
135/// # fn is_panic(&self) -> bool {
136/// # unreachable!()
137/// # }
138/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
139/// # unreachable!()
140/// # }
141/// # }
142/// #
143/// # struct MyCustomJoinHandle;
144/// #
145/// # impl Future for MyCustomJoinHandle {
146/// # type Output = Result<(), MyCustomJoinError>;
147/// #
148/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
149/// # unreachable!()
150/// # }
151/// # }
152/// #
153/// # struct MyCustomRuntime;
154/// #
155/// # impl Runtime for MyCustomRuntime {
156/// # type JoinError = MyCustomJoinError;
157/// # type JoinHandle = MyCustomJoinHandle;
158/// #
159/// # fn spawn<F>(fut: F) -> Self::JoinHandle
160/// # where
161/// # F: Future<Output = ()> + Send + 'static
162/// # {
163/// # unreachable!()
164/// # }
165/// # }
166/// #
167/// # impl ContextExt for MyCustomRuntime {
168/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
169/// # where
170/// # F: Future<Output = R> + Send + 'static
171/// # {
172/// # unreachable!()
173/// # }
174/// # fn get_task_locals() -> Option<TaskLocals> {
175/// # unreachable!()
176/// # }
177/// # }
178/// #
179/// # use std::time::Duration;
180/// #
181/// # use pyo3::prelude::*;
182/// #
183/// # Python::with_gil(|py| -> PyResult<()> {
184/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
185/// # #[cfg(feature = "tokio-runtime")]
186/// pyo3_asyncio::generic::run_until_complete::<MyCustomRuntime, _, _>(event_loop, async move {
187/// tokio::time::sleep(Duration::from_secs(1)).await;
188/// Ok(())
189/// })?;
190/// # Ok(())
191/// # }).unwrap();
192/// ```
193pub fn run_until_complete<R, F, T>(event_loop: &PyAny, fut: F) -> PyResult<T>
194where
195 R: Runtime + ContextExt,
196 F: Future<Output = PyResult<T>> + Send + 'static,
197 T: Send + Sync + 'static,
198{
199 let py = event_loop.py();
200 let result_tx = Arc::new(Mutex::new(None));
201 let result_rx = Arc::clone(&result_tx);
202 let coro = future_into_py_with_locals::<R, _, ()>(
203 py,
204 TaskLocals::new(event_loop).copy_context(py)?,
205 async move {
206 let val = fut.await?;
207 if let Ok(mut result) = result_tx.lock() {
208 *result = Some(val);
209 }
210 Ok(())
211 },
212 )?;
213
214 event_loop.call_method1("run_until_complete", (coro,))?;
215
216 let result = result_rx.lock().unwrap().take().unwrap();
217 Ok(result)
218}
219
220/// Run the event loop until the given Future completes
221///
222/// # Arguments
223/// * `py` - The current PyO3 GIL guard
224/// * `fut` - The future to drive to completion
225///
226/// # Examples
227///
228/// ```no_run
229/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
230/// #
231/// # use pyo3_asyncio::{
232/// # TaskLocals,
233/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
234/// # };
235/// #
236/// # struct MyCustomJoinError;
237/// #
238/// # impl JoinError for MyCustomJoinError {
239/// # fn is_panic(&self) -> bool {
240/// # unreachable!()
241/// # }
242/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
243/// # unreachable!()
244/// # }
245/// # }
246/// #
247/// # struct MyCustomJoinHandle;
248/// #
249/// # impl Future for MyCustomJoinHandle {
250/// # type Output = Result<(), MyCustomJoinError>;
251/// #
252/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
253/// # unreachable!()
254/// # }
255/// # }
256/// #
257/// # struct MyCustomRuntime;
258/// #
259/// # impl Runtime for MyCustomRuntime {
260/// # type JoinError = MyCustomJoinError;
261/// # type JoinHandle = MyCustomJoinHandle;
262/// #
263/// # fn spawn<F>(fut: F) -> Self::JoinHandle
264/// # where
265/// # F: Future<Output = ()> + Send + 'static
266/// # {
267/// # unreachable!()
268/// # }
269/// # }
270/// #
271/// # impl ContextExt for MyCustomRuntime {
272/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
273/// # where
274/// # F: Future<Output = R> + Send + 'static
275/// # {
276/// # unreachable!()
277/// # }
278/// # fn get_task_locals() -> Option<TaskLocals> {
279/// # unreachable!()
280/// # }
281/// # }
282/// #
283/// # use std::time::Duration;
284/// # async fn custom_sleep(_duration: Duration) { }
285/// #
286/// # use pyo3::prelude::*;
287/// #
288/// fn main() {
289/// Python::with_gil(|py| {
290/// pyo3_asyncio::generic::run::<MyCustomRuntime, _, _>(py, async move {
291/// custom_sleep(Duration::from_secs(1)).await;
292/// Ok(())
293/// })
294/// .map_err(|e| {
295/// e.print_and_set_sys_last_vars(py);
296/// })
297/// .unwrap();
298/// })
299/// }
300/// ```
301pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
302where
303 R: Runtime + ContextExt,
304 F: Future<Output = PyResult<T>> + Send + 'static,
305 T: Send + Sync + 'static,
306{
307 let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
308
309 let result = run_until_complete::<R, F, T>(event_loop, fut);
310
311 close(event_loop)?;
312
313 result
314}
315
316fn cancelled(future: &PyAny) -> PyResult<bool> {
317 future.getattr("cancelled")?.call0()?.is_true()
318}
319
320#[pyclass]
321struct CheckedCompletor;
322
323#[pymethods]
324impl CheckedCompletor {
325 fn __call__(&self, future: &PyAny, complete: &PyAny, value: &PyAny) -> PyResult<()> {
326 if cancelled(future)? {
327 return Ok(());
328 }
329
330 complete.call1((value,))?;
331
332 Ok(())
333 }
334}
335
336fn set_result(event_loop: &PyAny, future: &PyAny, result: PyResult<PyObject>) -> PyResult<()> {
337 let py = event_loop.py();
338 let none = py.None().into_ref(py);
339
340 let (complete, val) = match result {
341 Ok(val) => (future.getattr("set_result")?, val.into_py(py)),
342 Err(err) => (future.getattr("set_exception")?, err.into_py(py)),
343 };
344 call_soon_threadsafe(event_loop, none, (CheckedCompletor, future, complete, val))?;
345
346 Ok(())
347}
348
349/// Convert a Python `awaitable` into a Rust Future
350///
351/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
352/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
353/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
354///
355/// # Arguments
356/// * `awaitable` - The Python `awaitable` to be converted
357///
358/// # Examples
359///
360/// ```no_run
361/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
362/// #
363/// # use pyo3::prelude::*;
364/// #
365/// # use pyo3_asyncio::{
366/// # TaskLocals,
367/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
368/// # };
369/// #
370/// # struct MyCustomJoinError;
371/// #
372/// # impl JoinError for MyCustomJoinError {
373/// # fn is_panic(&self) -> bool {
374/// # unreachable!()
375/// # }
376/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
377/// # unreachable!()
378/// # }
379/// # }
380/// #
381/// # struct MyCustomJoinHandle;
382/// #
383/// # impl Future for MyCustomJoinHandle {
384/// # type Output = Result<(), MyCustomJoinError>;
385/// #
386/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
387/// # unreachable!()
388/// # }
389/// # }
390/// #
391/// # struct MyCustomRuntime;
392/// #
393/// # impl MyCustomRuntime {
394/// # async fn sleep(_: Duration) {
395/// # unreachable!()
396/// # }
397/// # }
398/// #
399/// # impl Runtime for MyCustomRuntime {
400/// # type JoinError = MyCustomJoinError;
401/// # type JoinHandle = MyCustomJoinHandle;
402/// #
403/// # fn spawn<F>(fut: F) -> Self::JoinHandle
404/// # where
405/// # F: Future<Output = ()> + Send + 'static
406/// # {
407/// # unreachable!()
408/// # }
409/// # }
410/// #
411/// # impl ContextExt for MyCustomRuntime {
412/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
413/// # where
414/// # F: Future<Output = R> + Send + 'static
415/// # {
416/// # unreachable!()
417/// # }
418/// # fn get_task_locals() -> Option<TaskLocals> {
419/// # unreachable!()
420/// # }
421/// # }
422/// #
423/// const PYTHON_CODE: &'static str = r#"
424/// import asyncio
425///
426/// async def py_sleep(duration):
427/// await asyncio.sleep(duration)
428/// "#;
429///
430/// async fn py_sleep(seconds: f32) -> PyResult<()> {
431/// let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
432/// Ok(
433/// PyModule::from_code(
434/// py,
435/// PYTHON_CODE,
436/// "test_into_future/test_mod.py",
437/// "test_mod"
438/// )?
439/// .into()
440/// )
441/// })?;
442///
443/// Python::with_gil(|py| {
444/// pyo3_asyncio::generic::into_future::<MyCustomRuntime>(
445/// test_mod
446/// .call_method1(py, "py_sleep", (seconds.into_py(py),))?
447/// .as_ref(py),
448/// )
449/// })?
450/// .await?;
451/// Ok(())
452/// }
453/// ```
454pub fn into_future<R>(
455 awaitable: &PyAny,
456) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
457where
458 R: Runtime + ContextExt,
459{
460 into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
461}
462
463/// Convert a Rust Future into a Python awaitable with a generic runtime
464///
465/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
466/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
467///
468/// Python `contextvars` are preserved when calling async Python functions within the Rust future
469/// via [`into_future`] (new behaviour in `v0.15`).
470///
471/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
472/// unfortunately fail to resolve them when called within the Rust future. This is because the
473/// function is being called from a Rust thread, not inside an actual Python coroutine context.
474/// >
475/// > As a workaround, you can get the `contextvars` from the current task locals using
476/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
477/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
478/// synchronous function, and restore the previous context when it returns or raises an exception.
479///
480/// # Arguments
481/// * `py` - PyO3 GIL guard
482/// * `locals` - The task-local data for Python
483/// * `fut` - The Rust future to be converted
484///
485/// # Examples
486///
487/// ```no_run
488/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
489/// #
490/// # use pyo3_asyncio::{
491/// # TaskLocals,
492/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
493/// # };
494/// #
495/// # struct MyCustomJoinError;
496/// #
497/// # impl JoinError for MyCustomJoinError {
498/// # fn is_panic(&self) -> bool {
499/// # unreachable!()
500/// # }
501/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
502/// # unreachable!()
503/// # }
504/// # }
505/// #
506/// # struct MyCustomJoinHandle;
507/// #
508/// # impl Future for MyCustomJoinHandle {
509/// # type Output = Result<(), MyCustomJoinError>;
510/// #
511/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
512/// # unreachable!()
513/// # }
514/// # }
515/// #
516/// # struct MyCustomRuntime;
517/// #
518/// # impl MyCustomRuntime {
519/// # async fn sleep(_: Duration) {
520/// # unreachable!()
521/// # }
522/// # }
523/// #
524/// # impl Runtime for MyCustomRuntime {
525/// # type JoinError = MyCustomJoinError;
526/// # type JoinHandle = MyCustomJoinHandle;
527/// #
528/// # fn spawn<F>(fut: F) -> Self::JoinHandle
529/// # where
530/// # F: Future<Output = ()> + Send + 'static
531/// # {
532/// # unreachable!()
533/// # }
534/// # }
535/// #
536/// # impl ContextExt for MyCustomRuntime {
537/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
538/// # where
539/// # F: Future<Output = R> + Send + 'static
540/// # {
541/// # unreachable!()
542/// # }
543/// # fn get_task_locals() -> Option<TaskLocals> {
544/// # unreachable!()
545/// # }
546/// # }
547/// #
548/// use std::time::Duration;
549///
550/// use pyo3::prelude::*;
551///
552/// /// Awaitable sleep function
553/// #[pyfunction]
554/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
555/// let secs = secs.extract()?;
556/// pyo3_asyncio::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
557/// py,
558/// pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
559/// async move {
560/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
561/// Ok(())
562/// }
563/// )
564/// }
565/// ```
566pub fn future_into_py_with_locals<R, F, T>(
567 py: Python,
568 locals: TaskLocals,
569 fut: F,
570) -> PyResult<&PyAny>
571where
572 R: Runtime + ContextExt,
573 F: Future<Output = PyResult<T>> + Send + 'static,
574 T: IntoPy<PyObject>,
575{
576 let (cancel_tx, cancel_rx) = oneshot::channel();
577
578 let py_fut = create_future(locals.event_loop.clone().into_ref(py))?;
579 py_fut.call_method1(
580 "add_done_callback",
581 (PyDoneCallback {
582 cancel_tx: Some(cancel_tx),
583 },),
584 )?;
585
586 let future_tx1 = PyObject::from(py_fut);
587 let future_tx2 = future_tx1.clone();
588
589 R::spawn(async move {
590 let locals2 = locals.clone();
591
592 if let Err(e) = R::spawn(async move {
593 let result = R::scope(
594 locals2.clone(),
595 Cancellable::new_with_cancel_rx(fut, cancel_rx),
596 )
597 .await;
598
599 Python::with_gil(move |py| {
600 if cancelled(future_tx1.as_ref(py))
601 .map_err(dump_err(py))
602 .unwrap_or(false)
603 {
604 return;
605 }
606
607 let _ = set_result(
608 locals2.event_loop(py),
609 future_tx1.as_ref(py),
610 result.map(|val| val.into_py(py)),
611 )
612 .map_err(dump_err(py));
613 });
614 })
615 .await
616 {
617 if e.is_panic() {
618 Python::with_gil(move |py| {
619 if cancelled(future_tx2.as_ref(py))
620 .map_err(dump_err(py))
621 .unwrap_or(false)
622 {
623 return;
624 }
625
626 let panic_message = format!(
627 "rust future panicked: {}",
628 get_panic_message(&e.into_panic())
629 );
630 let _ = set_result(
631 locals.event_loop.as_ref(py),
632 future_tx2.as_ref(py),
633 Err(RustPanic::new_err(panic_message)),
634 )
635 .map_err(dump_err(py));
636 });
637 }
638 }
639 });
640
641 Ok(py_fut)
642}
643
644fn get_panic_message(any: &dyn std::any::Any) -> &str {
645 if let Some(str_slice) = any.downcast_ref::<&str>() {
646 str_slice
647 } else if let Some(string) = any.downcast_ref::<String>() {
648 string
649 } else {
650 "unknown error"
651 }
652}
653
654pin_project! {
655 /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
656 #[must_use = "futures do nothing unless you `.await` or poll them"]
657 #[derive(Debug)]
658 struct Cancellable<T> {
659 #[pin]
660 future: T,
661 #[pin]
662 cancel_rx: oneshot::Receiver<()>,
663
664 poll_cancel_rx: bool
665 }
666}
667
668impl<T> Cancellable<T> {
669 fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
670 Self {
671 future,
672 cancel_rx,
673
674 poll_cancel_rx: true,
675 }
676 }
677}
678
679impl<F, T> Future for Cancellable<F>
680where
681 F: Future<Output = PyResult<T>>,
682 T: IntoPy<PyObject>,
683{
684 type Output = F::Output;
685
686 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
687 let this = self.project();
688
689 // First, try polling the future
690 if let Poll::Ready(v) = this.future.poll(cx) {
691 return Poll::Ready(v);
692 }
693
694 // Now check for cancellation
695 if *this.poll_cancel_rx {
696 match this.cancel_rx.poll(cx) {
697 Poll::Ready(Ok(())) => {
698 *this.poll_cancel_rx = false;
699 // The python future has already been cancelled, so this return value will never
700 // be used.
701 Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
702 "unreachable",
703 )))
704 }
705 Poll::Ready(Err(_)) => {
706 *this.poll_cancel_rx = false;
707 Poll::Pending
708 }
709 Poll::Pending => Poll::Pending,
710 }
711 } else {
712 Poll::Pending
713 }
714 }
715}
716
717#[pyclass]
718struct PyDoneCallback {
719 cancel_tx: Option<oneshot::Sender<()>>,
720}
721
722#[pymethods]
723impl PyDoneCallback {
724 pub fn __call__(&mut self, fut: &PyAny) -> PyResult<()> {
725 let py = fut.py();
726
727 if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
728 let _ = self.cancel_tx.take().unwrap().send(());
729 }
730
731 Ok(())
732 }
733}
734
735/// Convert a Rust Future into a Python awaitable with a generic runtime
736///
737/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
738/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
739///
740/// Python `contextvars` are preserved when calling async Python functions within the Rust future
741/// via [`into_future`] (new behaviour in `v0.15`).
742///
743/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
744/// unfortunately fail to resolve them when called within the Rust future. This is because the
745/// function is being called from a Rust thread, not inside an actual Python coroutine context.
746/// >
747/// > As a workaround, you can get the `contextvars` from the current task locals using
748/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
749/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
750/// synchronous function, and restore the previous context when it returns or raises an exception.
751///
752/// # Arguments
753/// * `py` - The current PyO3 GIL guard
754/// * `fut` - The Rust future to be converted
755///
756/// # Examples
757///
758/// ```no_run
759/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
760/// #
761/// # use pyo3_asyncio::{
762/// # TaskLocals,
763/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
764/// # };
765/// #
766/// # struct MyCustomJoinError;
767/// #
768/// # impl JoinError for MyCustomJoinError {
769/// # fn is_panic(&self) -> bool {
770/// # unreachable!()
771/// # }
772/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
773/// # unreachable!()
774/// # }
775/// # }
776/// #
777/// # struct MyCustomJoinHandle;
778/// #
779/// # impl Future for MyCustomJoinHandle {
780/// # type Output = Result<(), MyCustomJoinError>;
781/// #
782/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
783/// # unreachable!()
784/// # }
785/// # }
786/// #
787/// # struct MyCustomRuntime;
788/// #
789/// # impl MyCustomRuntime {
790/// # async fn sleep(_: Duration) {
791/// # unreachable!()
792/// # }
793/// # }
794/// #
795/// # impl Runtime for MyCustomRuntime {
796/// # type JoinError = MyCustomJoinError;
797/// # type JoinHandle = MyCustomJoinHandle;
798/// #
799/// # fn spawn<F>(fut: F) -> Self::JoinHandle
800/// # where
801/// # F: Future<Output = ()> + Send + 'static
802/// # {
803/// # unreachable!()
804/// # }
805/// # }
806/// #
807/// # impl ContextExt for MyCustomRuntime {
808/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
809/// # where
810/// # F: Future<Output = R> + Send + 'static
811/// # {
812/// # unreachable!()
813/// # }
814/// # fn get_task_locals() -> Option<TaskLocals> {
815/// # unreachable!()
816/// # }
817/// # }
818/// #
819/// use std::time::Duration;
820///
821/// use pyo3::prelude::*;
822///
823/// /// Awaitable sleep function
824/// #[pyfunction]
825/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
826/// let secs = secs.extract()?;
827/// pyo3_asyncio::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
828/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
829/// Ok(())
830/// })
831/// }
832/// ```
833pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<&PyAny>
834where
835 R: Runtime + ContextExt,
836 F: Future<Output = PyResult<T>> + Send + 'static,
837 T: IntoPy<PyObject>,
838{
839 future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
840}
841
842/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
843/// specification of task locals.
844///
845/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
846/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
847///
848/// Python `contextvars` are preserved when calling async Python functions within the Rust future
849/// via [`into_future`] (new behaviour in `v0.15`).
850///
851/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
852/// unfortunately fail to resolve them when called within the Rust future. This is because the
853/// function is being called from a Rust thread, not inside an actual Python coroutine context.
854/// >
855/// > As a workaround, you can get the `contextvars` from the current task locals using
856/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
857/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
858/// synchronous function, and restore the previous context when it returns or raises an exception.
859///
860/// # Arguments
861/// * `py` - PyO3 GIL guard
862/// * `locals` - The task locals for the future
863/// * `fut` - The Rust future to be converted
864///
865/// # Examples
866///
867/// ```no_run
868/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
869/// #
870/// # use pyo3_asyncio::{
871/// # TaskLocals,
872/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
873/// # };
874/// #
875/// # struct MyCustomJoinError;
876/// #
877/// # impl JoinError for MyCustomJoinError {
878/// # fn is_panic(&self) -> bool {
879/// # unreachable!()
880/// # }
881/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
882/// # unreachable!()
883/// # }
884/// # }
885/// #
886/// # struct MyCustomJoinHandle;
887/// #
888/// # impl Future for MyCustomJoinHandle {
889/// # type Output = Result<(), MyCustomJoinError>;
890/// #
891/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
892/// # unreachable!()
893/// # }
894/// # }
895/// #
896/// # struct MyCustomRuntime;
897/// #
898/// # impl MyCustomRuntime {
899/// # async fn sleep(_: Duration) {
900/// # unreachable!()
901/// # }
902/// # }
903/// #
904/// # impl Runtime for MyCustomRuntime {
905/// # type JoinError = MyCustomJoinError;
906/// # type JoinHandle = MyCustomJoinHandle;
907/// #
908/// # fn spawn<F>(fut: F) -> Self::JoinHandle
909/// # where
910/// # F: Future<Output = ()> + Send + 'static
911/// # {
912/// # unreachable!()
913/// # }
914/// # }
915/// #
916/// # impl ContextExt for MyCustomRuntime {
917/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
918/// # where
919/// # F: Future<Output = R> + Send + 'static
920/// # {
921/// # unreachable!()
922/// # }
923/// # fn get_task_locals() -> Option<TaskLocals> {
924/// # unreachable!()
925/// # }
926/// # }
927/// #
928/// # impl SpawnLocalExt for MyCustomRuntime {
929/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
930/// # where
931/// # F: Future<Output = ()> + 'static
932/// # {
933/// # unreachable!()
934/// # }
935/// # }
936/// #
937/// # impl LocalContextExt for MyCustomRuntime {
938/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
939/// # where
940/// # F: Future<Output = R> + 'static
941/// # {
942/// # unreachable!()
943/// # }
944/// # }
945/// #
946/// use std::{rc::Rc, time::Duration};
947///
948/// use pyo3::prelude::*;
949///
950/// /// Awaitable sleep function
951/// #[pyfunction]
952/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
953/// // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py
954/// let secs = Rc::new(secs);
955///
956/// pyo3_asyncio::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
957/// py,
958/// pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
959/// async move {
960/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
961/// Ok(())
962/// }
963/// )
964/// }
965/// ```
966#[deprecated(
967 since = "0.18.0",
968 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
969)]
970pub fn local_future_into_py_with_locals<R, F, T>(
971 py: Python,
972 locals: TaskLocals,
973 fut: F,
974) -> PyResult<&PyAny>
975where
976 R: Runtime + SpawnLocalExt + LocalContextExt,
977 F: Future<Output = PyResult<T>> + 'static,
978 T: IntoPy<PyObject>,
979{
980 let (cancel_tx, cancel_rx) = oneshot::channel();
981
982 let py_fut = create_future(locals.event_loop.clone().into_ref(py))?;
983 py_fut.call_method1(
984 "add_done_callback",
985 (PyDoneCallback {
986 cancel_tx: Some(cancel_tx),
987 },),
988 )?;
989
990 let future_tx1 = PyObject::from(py_fut);
991 let future_tx2 = future_tx1.clone();
992
993 R::spawn_local(async move {
994 let locals2 = locals.clone();
995
996 if let Err(e) = R::spawn_local(async move {
997 let result = R::scope_local(
998 locals2.clone(),
999 Cancellable::new_with_cancel_rx(fut, cancel_rx),
1000 )
1001 .await;
1002
1003 Python::with_gil(move |py| {
1004 if cancelled(future_tx1.as_ref(py))
1005 .map_err(dump_err(py))
1006 .unwrap_or(false)
1007 {
1008 return;
1009 }
1010
1011 let _ = set_result(
1012 locals2.event_loop.as_ref(py),
1013 future_tx1.as_ref(py),
1014 result.map(|val| val.into_py(py)),
1015 )
1016 .map_err(dump_err(py));
1017 });
1018 })
1019 .await
1020 {
1021 if e.is_panic() {
1022 Python::with_gil(move |py| {
1023 if cancelled(future_tx2.as_ref(py))
1024 .map_err(dump_err(py))
1025 .unwrap_or(false)
1026 {
1027 return;
1028 }
1029
1030 let panic_message = format!(
1031 "rust future panicked: {}",
1032 get_panic_message(&e.into_panic())
1033 );
1034 let _ = set_result(
1035 locals.event_loop.as_ref(py),
1036 future_tx2.as_ref(py),
1037 Err(RustPanic::new_err(panic_message)),
1038 )
1039 .map_err(dump_err(py));
1040 });
1041 }
1042 }
1043 });
1044
1045 Ok(py_fut)
1046}
1047
1048/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1049///
1050/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1051/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1052///
1053/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1054/// via [`into_future`] (new behaviour in `v0.15`).
1055///
1056/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1057/// unfortunately fail to resolve them when called within the Rust future. This is because the
1058/// function is being called from a Rust thread, not inside an actual Python coroutine context.
1059/// >
1060/// > As a workaround, you can get the `contextvars` from the current task locals using
1061/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1062/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1063/// synchronous function, and restore the previous context when it returns or raises an exception.
1064///
1065/// # Arguments
1066/// * `py` - The current PyO3 GIL guard
1067/// * `fut` - The Rust future to be converted
1068///
1069/// # Examples
1070///
1071/// ```no_run
1072/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1073/// #
1074/// # use pyo3_asyncio::{
1075/// # TaskLocals,
1076/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1077/// # };
1078/// #
1079/// # struct MyCustomJoinError;
1080/// #
1081/// # impl JoinError for MyCustomJoinError {
1082/// # fn is_panic(&self) -> bool {
1083/// # unreachable!()
1084/// # }
1085/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1086/// # unreachable!()
1087/// # }
1088/// # }
1089/// #
1090/// # struct MyCustomJoinHandle;
1091/// #
1092/// # impl Future for MyCustomJoinHandle {
1093/// # type Output = Result<(), MyCustomJoinError>;
1094/// #
1095/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1096/// # unreachable!()
1097/// # }
1098/// # }
1099/// #
1100/// # struct MyCustomRuntime;
1101/// #
1102/// # impl MyCustomRuntime {
1103/// # async fn sleep(_: Duration) {
1104/// # unreachable!()
1105/// # }
1106/// # }
1107/// #
1108/// # impl Runtime for MyCustomRuntime {
1109/// # type JoinError = MyCustomJoinError;
1110/// # type JoinHandle = MyCustomJoinHandle;
1111/// #
1112/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1113/// # where
1114/// # F: Future<Output = ()> + Send + 'static
1115/// # {
1116/// # unreachable!()
1117/// # }
1118/// # }
1119/// #
1120/// # impl ContextExt for MyCustomRuntime {
1121/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1122/// # where
1123/// # F: Future<Output = R> + Send + 'static
1124/// # {
1125/// # unreachable!()
1126/// # }
1127/// # fn get_task_locals() -> Option<TaskLocals> {
1128/// # unreachable!()
1129/// # }
1130/// # }
1131/// #
1132/// # impl SpawnLocalExt for MyCustomRuntime {
1133/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
1134/// # where
1135/// # F: Future<Output = ()> + 'static
1136/// # {
1137/// # unreachable!()
1138/// # }
1139/// # }
1140/// #
1141/// # impl LocalContextExt for MyCustomRuntime {
1142/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1143/// # where
1144/// # F: Future<Output = R> + 'static
1145/// # {
1146/// # unreachable!()
1147/// # }
1148/// # }
1149/// #
1150/// use std::{rc::Rc, time::Duration};
1151///
1152/// use pyo3::prelude::*;
1153///
1154/// /// Awaitable sleep function
1155/// #[pyfunction]
1156/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> {
1157/// // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py
1158/// let secs = Rc::new(secs);
1159///
1160/// pyo3_asyncio::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1161/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1162/// Ok(())
1163/// })
1164/// }
1165/// ```
1166#[deprecated(
1167 since = "0.18.0",
1168 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1169)]
1170#[allow(deprecated)]
1171pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<&PyAny>
1172where
1173 R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1174 F: Future<Output = PyResult<T>> + 'static,
1175 T: IntoPy<PyObject>,
1176{
1177 local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1178}
1179
1180/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1181///
1182/// **This API is marked as unstable** and is only available when the
1183/// `unstable-streams` crate feature is enabled. This comes with no
1184/// stability guarantees, and could be changed or removed at any time.
1185///
1186/// # Arguments
1187/// * `locals` - The current task locals
1188/// * `gen` - The Python async generator to be converted
1189///
1190/// # Examples
1191/// ```no_run
1192/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1193/// #
1194/// # use pyo3_asyncio::{
1195/// # TaskLocals,
1196/// # generic::{JoinError, ContextExt, Runtime}
1197/// # };
1198/// #
1199/// # struct MyCustomJoinError;
1200/// #
1201/// # impl JoinError for MyCustomJoinError {
1202/// # fn is_panic(&self) -> bool {
1203/// # unreachable!()
1204/// # }
1205/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1206/// # unreachable!()
1207/// # }
1208/// # }
1209/// #
1210/// # struct MyCustomJoinHandle;
1211/// #
1212/// # impl Future for MyCustomJoinHandle {
1213/// # type Output = Result<(), MyCustomJoinError>;
1214/// #
1215/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1216/// # unreachable!()
1217/// # }
1218/// # }
1219/// #
1220/// # struct MyCustomRuntime;
1221/// #
1222/// # impl Runtime for MyCustomRuntime {
1223/// # type JoinError = MyCustomJoinError;
1224/// # type JoinHandle = MyCustomJoinHandle;
1225/// #
1226/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1227/// # where
1228/// # F: Future<Output = ()> + Send + 'static
1229/// # {
1230/// # unreachable!()
1231/// # }
1232/// # }
1233/// #
1234/// # impl ContextExt for MyCustomRuntime {
1235/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1236/// # where
1237/// # F: Future<Output = R> + Send + 'static
1238/// # {
1239/// # unreachable!()
1240/// # }
1241/// # fn get_task_locals() -> Option<TaskLocals> {
1242/// # unreachable!()
1243/// # }
1244/// # }
1245///
1246/// use pyo3::prelude::*;
1247/// use futures::{StreamExt, TryStreamExt};
1248///
1249/// const TEST_MOD: &str = r#"
1250/// import asyncio
1251///
1252/// async def gen():
1253/// for i in range(10):
1254/// await asyncio.sleep(0.1)
1255/// yield i
1256/// "#;
1257///
1258/// # async fn test_async_gen() -> PyResult<()> {
1259/// let stream = Python::with_gil(|py| {
1260/// let test_mod = PyModule::from_code(
1261/// py,
1262/// TEST_MOD,
1263/// "test_rust_coroutine/test_mod.py",
1264/// "test_mod",
1265/// )?;
1266///
1267/// pyo3_asyncio::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1268/// pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
1269/// test_mod.call_method0("gen")?
1270/// )
1271/// })?;
1272///
1273/// let vals = stream
1274/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
1275/// .try_collect::<Vec<i32>>()
1276/// .await?;
1277///
1278/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1279///
1280/// Ok(())
1281/// # }
1282/// ```
1283#[cfg(feature = "unstable-streams")]
1284pub fn into_stream_with_locals_v1<'p, R>(
1285 locals: TaskLocals,
1286 gen: &'p PyAny,
1287) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1288where
1289 R: Runtime,
1290{
1291 let (tx, rx) = async_channel::bounded(1);
1292 let anext = PyObject::from(gen.getattr("__anext__")?);
1293
1294 R::spawn(async move {
1295 loop {
1296 let fut = Python::with_gil(|py| -> PyResult<_> {
1297 into_future_with_locals(&locals, anext.as_ref(py).call0()?)
1298 });
1299 let item = match fut {
1300 Ok(fut) => match fut.await {
1301 Ok(item) => Ok(item),
1302 Err(e) => {
1303 let stop_iter = Python::with_gil(|py| {
1304 e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1305 });
1306
1307 if stop_iter {
1308 // end the iteration
1309 break;
1310 } else {
1311 Err(e)
1312 }
1313 }
1314 },
1315 Err(e) => Err(e),
1316 };
1317
1318 if tx.send(item).await.is_err() {
1319 // receiving side was dropped
1320 break;
1321 }
1322 }
1323 });
1324
1325 Ok(rx)
1326}
1327
1328/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1329///
1330/// **This API is marked as unstable** and is only available when the
1331/// `unstable-streams` crate feature is enabled. This comes with no
1332/// stability guarantees, and could be changed or removed at any time.
1333///
1334/// # Arguments
1335/// * `gen` - The Python async generator to be converted
1336///
1337/// # Examples
1338/// ```no_run
1339/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1340/// #
1341/// # use pyo3_asyncio::{
1342/// # TaskLocals,
1343/// # generic::{JoinError, ContextExt, Runtime}
1344/// # };
1345/// #
1346/// # struct MyCustomJoinError;
1347/// #
1348/// # impl JoinError for MyCustomJoinError {
1349/// # fn is_panic(&self) -> bool {
1350/// # unreachable!()
1351/// # }
1352/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1353/// # unreachable!()
1354/// # }
1355/// # }
1356/// #
1357/// # struct MyCustomJoinHandle;
1358/// #
1359/// # impl Future for MyCustomJoinHandle {
1360/// # type Output = Result<(), MyCustomJoinError>;
1361/// #
1362/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1363/// # unreachable!()
1364/// # }
1365/// # }
1366/// #
1367/// # struct MyCustomRuntime;
1368/// #
1369/// # impl Runtime for MyCustomRuntime {
1370/// # type JoinError = MyCustomJoinError;
1371/// # type JoinHandle = MyCustomJoinHandle;
1372/// #
1373/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1374/// # where
1375/// # F: Future<Output = ()> + Send + 'static
1376/// # {
1377/// # unreachable!()
1378/// # }
1379/// # }
1380/// #
1381/// # impl ContextExt for MyCustomRuntime {
1382/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1383/// # where
1384/// # F: Future<Output = R> + Send + 'static
1385/// # {
1386/// # unreachable!()
1387/// # }
1388/// # fn get_task_locals() -> Option<TaskLocals> {
1389/// # unreachable!()
1390/// # }
1391/// # }
1392///
1393/// use pyo3::prelude::*;
1394/// use futures::{StreamExt, TryStreamExt};
1395///
1396/// const TEST_MOD: &str = r#"
1397/// import asyncio
1398///
1399/// async def gen():
1400/// for i in range(10):
1401/// await asyncio.sleep(0.1)
1402/// yield i
1403/// "#;
1404///
1405/// # async fn test_async_gen() -> PyResult<()> {
1406/// let stream = Python::with_gil(|py| {
1407/// let test_mod = PyModule::from_code(
1408/// py,
1409/// TEST_MOD,
1410/// "test_rust_coroutine/test_mod.py",
1411/// "test_mod",
1412/// )?;
1413///
1414/// pyo3_asyncio::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1415/// })?;
1416///
1417/// let vals = stream
1418/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
1419/// .try_collect::<Vec<i32>>()
1420/// .await?;
1421///
1422/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1423///
1424/// Ok(())
1425/// # }
1426/// ```
1427#[cfg(feature = "unstable-streams")]
1428pub fn into_stream_v1<'p, R>(
1429 gen: &'p PyAny,
1430) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1431where
1432 R: Runtime + ContextExt,
1433{
1434 into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1435}
1436
1437fn py_true() -> PyObject {
1438 static TRUE: OnceCell<PyObject> = OnceCell::new();
1439 TRUE.get_or_init(|| Python::with_gil(|py| true.into_py(py)))
1440 .clone()
1441}
1442fn py_false() -> PyObject {
1443 static FALSE: OnceCell<PyObject> = OnceCell::new();
1444 FALSE
1445 .get_or_init(|| Python::with_gil(|py| false.into_py(py)))
1446 .clone()
1447}
1448
1449trait Sender: Send + 'static {
1450 fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject>;
1451 fn close(&mut self) -> PyResult<()>;
1452}
1453
1454struct GenericSender<R>
1455where
1456 R: Runtime,
1457{
1458 runtime: PhantomData<R>,
1459 tx: mpsc::Sender<PyObject>,
1460}
1461
1462impl<R> Sender for GenericSender<R>
1463where
1464 R: Runtime + ContextExt,
1465{
1466 fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject> {
1467 match self.tx.try_send(item.clone()) {
1468 Ok(_) => Ok(py_true()),
1469 Err(e) => {
1470 if e.is_full() {
1471 let mut tx = self.tx.clone();
1472 Python::with_gil(move |py| {
1473 Ok(
1474 future_into_py_with_locals::<R, _, PyObject>(py, locals, async move {
1475 if tx.flush().await.is_err() {
1476 // receiving side disconnected
1477 return Ok(py_false());
1478 }
1479 if tx.send(item).await.is_err() {
1480 // receiving side disconnected
1481 return Ok(py_false());
1482 }
1483 Ok(py_true())
1484 })?
1485 .into(),
1486 )
1487 })
1488 } else {
1489 Ok(py_false())
1490 }
1491 }
1492 }
1493 }
1494 fn close(&mut self) -> PyResult<()> {
1495 self.tx.close_channel();
1496 Ok(())
1497 }
1498}
1499
1500#[pyclass]
1501struct SenderGlue {
1502 locals: TaskLocals,
1503 tx: Box<dyn Sender>,
1504}
1505#[pymethods]
1506impl SenderGlue {
1507 pub fn send(&mut self, item: PyObject) -> PyResult<PyObject> {
1508 self.tx.send(self.locals.clone(), item)
1509 }
1510 pub fn close(&mut self) -> PyResult<()> {
1511 self.tx.close()
1512 }
1513}
1514
1515#[cfg(feature = "unstable-streams")]
1516const STREAM_GLUE: &str = r#"
1517import asyncio
1518
1519async def forward(gen, sender):
1520 async for item in gen:
1521 should_continue = sender.send(item)
1522
1523 if asyncio.iscoroutine(should_continue):
1524 should_continue = await should_continue
1525
1526 if should_continue:
1527 continue
1528 else:
1529 break
1530
1531 sender.close()
1532"#;
1533
1534/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1535///
1536/// **This API is marked as unstable** and is only available when the
1537/// `unstable-streams` crate feature is enabled. This comes with no
1538/// stability guarantees, and could be changed or removed at any time.
1539///
1540/// # Arguments
1541/// * `locals` - The current task locals
1542/// * `gen` - The Python async generator to be converted
1543///
1544/// # Examples
1545/// ```no_run
1546/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1547/// #
1548/// # use pyo3_asyncio::{
1549/// # TaskLocals,
1550/// # generic::{JoinError, ContextExt, Runtime}
1551/// # };
1552/// #
1553/// # struct MyCustomJoinError;
1554/// #
1555/// # impl JoinError for MyCustomJoinError {
1556/// # fn is_panic(&self) -> bool {
1557/// # unreachable!()
1558/// # }
1559/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1560/// # unreachable!()
1561/// # }
1562/// # }
1563/// #
1564/// # struct MyCustomJoinHandle;
1565/// #
1566/// # impl Future for MyCustomJoinHandle {
1567/// # type Output = Result<(), MyCustomJoinError>;
1568/// #
1569/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1570/// # unreachable!()
1571/// # }
1572/// # }
1573/// #
1574/// # struct MyCustomRuntime;
1575/// #
1576/// # impl Runtime for MyCustomRuntime {
1577/// # type JoinError = MyCustomJoinError;
1578/// # type JoinHandle = MyCustomJoinHandle;
1579/// #
1580/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1581/// # where
1582/// # F: Future<Output = ()> + Send + 'static
1583/// # {
1584/// # unreachable!()
1585/// # }
1586/// # }
1587/// #
1588/// # impl ContextExt for MyCustomRuntime {
1589/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1590/// # where
1591/// # F: Future<Output = R> + Send + 'static
1592/// # {
1593/// # unreachable!()
1594/// # }
1595/// # fn get_task_locals() -> Option<TaskLocals> {
1596/// # unreachable!()
1597/// # }
1598/// # }
1599///
1600/// use pyo3::prelude::*;
1601/// use futures::{StreamExt, TryStreamExt};
1602///
1603/// const TEST_MOD: &str = r#"
1604/// import asyncio
1605///
1606/// async def gen():
1607/// for i in range(10):
1608/// await asyncio.sleep(0.1)
1609/// yield i
1610/// "#;
1611///
1612/// # async fn test_async_gen() -> PyResult<()> {
1613/// let stream = Python::with_gil(|py| {
1614/// let test_mod = PyModule::from_code(
1615/// py,
1616/// TEST_MOD,
1617/// "test_rust_coroutine/test_mod.py",
1618/// "test_mod",
1619/// )?;
1620///
1621/// pyo3_asyncio::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1622/// pyo3_asyncio::generic::get_current_locals::<MyCustomRuntime>(py)?,
1623/// test_mod.call_method0("gen")?
1624/// )
1625/// })?;
1626///
1627/// let vals = stream
1628/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
1629/// .try_collect::<Vec<i32>>()
1630/// .await?;
1631///
1632/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1633///
1634/// Ok(())
1635/// # }
1636/// ```
1637#[cfg(feature = "unstable-streams")]
1638pub fn into_stream_with_locals_v2<'p, R>(
1639 locals: TaskLocals,
1640 gen: &'p PyAny,
1641) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1642where
1643 R: Runtime + ContextExt,
1644{
1645 static GLUE_MOD: OnceCell<PyObject> = OnceCell::new();
1646 let py = gen.py();
1647 let glue = GLUE_MOD
1648 .get_or_try_init(|| -> PyResult<PyObject> {
1649 Ok(PyModule::from_code(
1650 py,
1651 STREAM_GLUE,
1652 "pyo3_asyncio/pyo3_asyncio_glue.py",
1653 "pyo3_asyncio_glue",
1654 )?
1655 .into())
1656 })?
1657 .as_ref(py);
1658
1659 let (tx, rx) = mpsc::channel(10);
1660
1661 locals.event_loop(py).call_method1(
1662 "call_soon_threadsafe",
1663 (
1664 locals.event_loop(py).getattr("create_task")?,
1665 glue.call_method1(
1666 "forward",
1667 (
1668 gen,
1669 SenderGlue {
1670 locals,
1671 tx: Box::new(GenericSender {
1672 runtime: PhantomData::<R>,
1673 tx,
1674 }),
1675 },
1676 ),
1677 )?,
1678 ),
1679 )?;
1680 Ok(rx)
1681}
1682
1683/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1684///
1685/// **This API is marked as unstable** and is only available when the
1686/// `unstable-streams` crate feature is enabled. This comes with no
1687/// stability guarantees, and could be changed or removed at any time.
1688///
1689/// # Arguments
1690/// * `gen` - The Python async generator to be converted
1691///
1692/// # Examples
1693/// ```no_run
1694/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1695/// #
1696/// # use pyo3_asyncio::{
1697/// # TaskLocals,
1698/// # generic::{JoinError, ContextExt, Runtime}
1699/// # };
1700/// #
1701/// # struct MyCustomJoinError;
1702/// #
1703/// # impl JoinError for MyCustomJoinError {
1704/// # fn is_panic(&self) -> bool {
1705/// # unreachable!()
1706/// # }
1707/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1708/// # unreachable!()
1709/// # }
1710/// # }
1711/// #
1712/// # struct MyCustomJoinHandle;
1713/// #
1714/// # impl Future for MyCustomJoinHandle {
1715/// # type Output = Result<(), MyCustomJoinError>;
1716/// #
1717/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1718/// # unreachable!()
1719/// # }
1720/// # }
1721/// #
1722/// # struct MyCustomRuntime;
1723/// #
1724/// # impl Runtime for MyCustomRuntime {
1725/// # type JoinError = MyCustomJoinError;
1726/// # type JoinHandle = MyCustomJoinHandle;
1727/// #
1728/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1729/// # where
1730/// # F: Future<Output = ()> + Send + 'static
1731/// # {
1732/// # unreachable!()
1733/// # }
1734/// # }
1735/// #
1736/// # impl ContextExt for MyCustomRuntime {
1737/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1738/// # where
1739/// # F: Future<Output = R> + Send + 'static
1740/// # {
1741/// # unreachable!()
1742/// # }
1743/// # fn get_task_locals() -> Option<TaskLocals> {
1744/// # unreachable!()
1745/// # }
1746/// # }
1747///
1748/// use pyo3::prelude::*;
1749/// use futures::{StreamExt, TryStreamExt};
1750///
1751/// const TEST_MOD: &str = r#"
1752/// import asyncio
1753///
1754/// async def gen():
1755/// for i in range(10):
1756/// await asyncio.sleep(0.1)
1757/// yield i
1758/// "#;
1759///
1760/// # async fn test_async_gen() -> PyResult<()> {
1761/// let stream = Python::with_gil(|py| {
1762/// let test_mod = PyModule::from_code(
1763/// py,
1764/// TEST_MOD,
1765/// "test_rust_coroutine/test_mod.py",
1766/// "test_mod",
1767/// )?;
1768///
1769/// pyo3_asyncio::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1770/// })?;
1771///
1772/// let vals = stream
1773/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
1774/// .try_collect::<Vec<i32>>()
1775/// .await?;
1776///
1777/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1778///
1779/// Ok(())
1780/// # }
1781/// ```
1782#[cfg(feature = "unstable-streams")]
1783pub fn into_stream_v2<'p, R>(
1784 gen: &'p PyAny,
1785) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1786where
1787 R: Runtime + ContextExt,
1788{
1789 into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1790}