pyo3_async_runtimes/generic.rs
1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! > are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-async-runtimes]
12//! version = "0.24"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17 future::Future,
18 pin::Pin,
19 sync::{Arc, Mutex},
20 task::{Context, Poll},
21};
22
23use crate::{
24 asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
25 get_running_loop, into_future_with_locals, TaskLocals,
26};
27use futures::channel::oneshot;
28#[cfg(feature = "unstable-streams")]
29use futures::{channel::mpsc, SinkExt};
30use pin_project_lite::pin_project;
31use pyo3::prelude::*;
32use pyo3::IntoPyObjectExt;
33#[cfg(feature = "unstable-streams")]
34use std::marker::PhantomData;
35
36/// Generic utilities for a JoinError
37pub trait JoinError {
38 /// Check if the spawned task exited because of a panic
39 fn is_panic(&self) -> bool;
40 /// Get the panic object associated with the error. Panics if `is_panic` is not true.
41 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
42}
43
44/// Generic Rust async/await runtime
45pub trait Runtime: Send + 'static {
46 /// The error returned by a JoinHandle after being awaited
47 type JoinError: JoinError + Send;
48 /// A future that completes with the result of the spawned task
49 type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
50
51 /// Spawn a future onto this runtime's event loop
52 fn spawn<F>(fut: F) -> Self::JoinHandle
53 where
54 F: Future<Output = ()> + Send + 'static;
55}
56
57/// Extension trait for async/await runtimes that support spawning local tasks
58pub trait SpawnLocalExt: Runtime {
59 /// Spawn a !Send future onto this runtime's event loop
60 fn spawn_local<F>(fut: F) -> Self::JoinHandle
61 where
62 F: Future<Output = ()> + 'static;
63}
64
65/// Exposes the utilities necessary for using task-local data in the Runtime
66pub trait ContextExt: Runtime {
67 /// Set the task locals for the given future
68 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
69 where
70 F: Future<Output = R> + Send + 'static;
71
72 /// Get the task locals for the current task
73 fn get_task_locals() -> Option<TaskLocals>;
74}
75
76/// Adds the ability to scope task-local data for !Send futures
77pub trait LocalContextExt: Runtime {
78 /// Set the task locals for the given !Send future
79 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
80 where
81 F: Future<Output = R> + 'static;
82}
83
84/// Get the current event loop from either Python or Rust async task local context
85///
86/// This function first checks if the runtime has a task-local reference to the Python event loop.
87/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
88/// with the current OS thread.
89pub fn get_current_loop<R>(py: Python) -> PyResult<Bound<PyAny>>
90where
91 R: ContextExt,
92{
93 if let Some(locals) = R::get_task_locals() {
94 Ok(locals.event_loop.into_bound(py))
95 } else {
96 get_running_loop(py)
97 }
98}
99
100/// Either copy the task locals from the current task OR get the current running loop and
101/// contextvars from Python.
102pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
103where
104 R: ContextExt,
105{
106 if let Some(locals) = R::get_task_locals() {
107 Ok(locals)
108 } else {
109 Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
110 }
111}
112
113/// Run the event loop until the given Future completes
114///
115/// After this function returns, the event loop can be resumed with [`run_until_complete`]
116///
117/// # Arguments
118/// * `event_loop` - The Python event loop that should run the future
119/// * `fut` - The future to drive to completion
120///
121/// # Examples
122///
123/// ```no_run
124/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
125/// #
126/// # use pyo3_async_runtimes::{
127/// # TaskLocals,
128/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
129/// # };
130/// #
131/// # struct MyCustomJoinError;
132/// #
133/// # impl JoinError for MyCustomJoinError {
134/// # fn is_panic(&self) -> bool {
135/// # unreachable!()
136/// # }
137/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
138/// # unreachable!()
139/// # }
140/// # }
141/// #
142/// # struct MyCustomJoinHandle;
143/// #
144/// # impl Future for MyCustomJoinHandle {
145/// # type Output = Result<(), MyCustomJoinError>;
146/// #
147/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
148/// # unreachable!()
149/// # }
150/// # }
151/// #
152/// # struct MyCustomRuntime;
153/// #
154/// # impl Runtime for MyCustomRuntime {
155/// # type JoinError = MyCustomJoinError;
156/// # type JoinHandle = MyCustomJoinHandle;
157/// #
158/// # fn spawn<F>(fut: F) -> Self::JoinHandle
159/// # where
160/// # F: Future<Output = ()> + Send + 'static
161/// # {
162/// # unreachable!()
163/// # }
164/// # }
165/// #
166/// # impl ContextExt for MyCustomRuntime {
167/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
168/// # where
169/// # F: Future<Output = R> + Send + 'static
170/// # {
171/// # unreachable!()
172/// # }
173/// # fn get_task_locals() -> Option<TaskLocals> {
174/// # unreachable!()
175/// # }
176/// # }
177/// #
178/// # use std::time::Duration;
179/// #
180/// # use pyo3::prelude::*;
181/// #
182/// # Python::attach(|py| -> PyResult<()> {
183/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
184/// # #[cfg(feature = "tokio-runtime")]
185/// pyo3_async_runtimes::generic::run_until_complete::<MyCustomRuntime, _, _>(&event_loop, async move {
186/// tokio::time::sleep(Duration::from_secs(1)).await;
187/// Ok(())
188/// })?;
189/// # Ok(())
190/// # }).unwrap();
191/// ```
192pub fn run_until_complete<R, F, T>(event_loop: &Bound<PyAny>, fut: F) -> PyResult<T>
193where
194 R: Runtime + ContextExt,
195 F: Future<Output = PyResult<T>> + Send + 'static,
196 T: Send + Sync + 'static,
197{
198 let py = event_loop.py();
199 let result_tx = Arc::new(Mutex::new(None));
200 let result_rx = Arc::clone(&result_tx);
201 let coro = future_into_py_with_locals::<R, _, ()>(
202 py,
203 TaskLocals::new(event_loop.clone()).copy_context(py)?,
204 async move {
205 let val = fut.await?;
206 if let Ok(mut result) = result_tx.lock() {
207 *result = Some(val);
208 }
209 Ok(())
210 },
211 )?;
212
213 event_loop.call_method1("run_until_complete", (coro,))?;
214
215 let result = result_rx.lock().unwrap().take().unwrap();
216 Ok(result)
217}
218
219/// Run the event loop until the given Future completes
220///
221/// # Arguments
222/// * `py` - The current PyO3 GIL guard
223/// * `fut` - The future to drive to completion
224///
225/// # Examples
226///
227/// ```no_run
228/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
229/// #
230/// # use pyo3_async_runtimes::{
231/// # TaskLocals,
232/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
233/// # };
234/// #
235/// # struct MyCustomJoinError;
236/// #
237/// # impl JoinError for MyCustomJoinError {
238/// # fn is_panic(&self) -> bool {
239/// # unreachable!()
240/// # }
241/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
242/// # unreachable!()
243/// # }
244/// # }
245/// #
246/// # struct MyCustomJoinHandle;
247/// #
248/// # impl Future for MyCustomJoinHandle {
249/// # type Output = Result<(), MyCustomJoinError>;
250/// #
251/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
252/// # unreachable!()
253/// # }
254/// # }
255/// #
256/// # struct MyCustomRuntime;
257/// #
258/// # impl Runtime for MyCustomRuntime {
259/// # type JoinError = MyCustomJoinError;
260/// # type JoinHandle = MyCustomJoinHandle;
261/// #
262/// # fn spawn<F>(fut: F) -> Self::JoinHandle
263/// # where
264/// # F: Future<Output = ()> + Send + 'static
265/// # {
266/// # unreachable!()
267/// # }
268/// # }
269/// #
270/// # impl ContextExt for MyCustomRuntime {
271/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
272/// # where
273/// # F: Future<Output = R> + Send + 'static
274/// # {
275/// # unreachable!()
276/// # }
277/// # fn get_task_locals() -> Option<TaskLocals> {
278/// # unreachable!()
279/// # }
280/// # }
281/// #
282/// # use std::time::Duration;
283/// # async fn custom_sleep(_duration: Duration) { }
284/// #
285/// # use pyo3::prelude::*;
286/// #
287/// fn main() {
288/// Python::attach(|py| {
289/// pyo3_async_runtimes::generic::run::<MyCustomRuntime, _, _>(py, async move {
290/// custom_sleep(Duration::from_secs(1)).await;
291/// Ok(())
292/// })
293/// .map_err(|e| {
294/// e.print_and_set_sys_last_vars(py);
295/// })
296/// .unwrap();
297/// })
298/// }
299/// ```
300pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
301where
302 R: Runtime + ContextExt,
303 F: Future<Output = PyResult<T>> + Send + 'static,
304 T: Send + Sync + 'static,
305{
306 let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
307
308 let result = run_until_complete::<R, F, T>(&event_loop, fut);
309
310 close(event_loop)?;
311
312 result
313}
314
315fn cancelled(future: &Bound<PyAny>) -> PyResult<bool> {
316 future.getattr("cancelled")?.call0()?.is_truthy()
317}
318
319#[pyclass]
320struct CheckedCompletor;
321
322#[pymethods]
323impl CheckedCompletor {
324 fn __call__(
325 &self,
326 future: &Bound<PyAny>,
327 complete: &Bound<PyAny>,
328 value: &Bound<PyAny>,
329 ) -> PyResult<()> {
330 if cancelled(future)? {
331 return Ok(());
332 }
333
334 complete.call1((value,))?;
335
336 Ok(())
337 }
338}
339
340fn set_result(
341 event_loop: &Bound<PyAny>,
342 future: &Bound<PyAny>,
343 result: PyResult<Py<PyAny>>,
344) -> PyResult<()> {
345 let py = event_loop.py();
346 let none = py.None().into_bound(py);
347
348 let (complete, val) = match result {
349 Ok(val) => (future.getattr("set_result")?, val.into_pyobject(py)?),
350 Err(err) => (future.getattr("set_exception")?, err.into_bound_py_any(py)?),
351 };
352 call_soon_threadsafe(event_loop, &none, (CheckedCompletor, future, complete, val))?;
353
354 Ok(())
355}
356
357/// Convert a Python `awaitable` into a Rust Future
358///
359/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
360/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
361/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
362///
363/// # Arguments
364/// * `awaitable` - The Python `awaitable` to be converted
365///
366/// # Examples
367///
368/// ```no_run
369/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
370/// # use std::ffi::CString;
371/// #
372/// # use pyo3::prelude::*;
373/// #
374/// # use pyo3_async_runtimes::{
375/// # TaskLocals,
376/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
377/// # };
378/// #
379/// # struct MyCustomJoinError;
380/// #
381/// # impl JoinError for MyCustomJoinError {
382/// # fn is_panic(&self) -> bool {
383/// # unreachable!()
384/// # }
385/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
386/// # unreachable!()
387/// # }
388/// # }
389/// #
390/// # struct MyCustomJoinHandle;
391/// #
392/// # impl Future for MyCustomJoinHandle {
393/// # type Output = Result<(), MyCustomJoinError>;
394/// #
395/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
396/// # unreachable!()
397/// # }
398/// # }
399/// #
400/// # struct MyCustomRuntime;
401/// #
402/// # impl MyCustomRuntime {
403/// # async fn sleep(_: Duration) {
404/// # unreachable!()
405/// # }
406/// # }
407/// #
408/// # impl Runtime for MyCustomRuntime {
409/// # type JoinError = MyCustomJoinError;
410/// # type JoinHandle = MyCustomJoinHandle;
411/// #
412/// # fn spawn<F>(fut: F) -> Self::JoinHandle
413/// # where
414/// # F: Future<Output = ()> + Send + 'static
415/// # {
416/// # unreachable!()
417/// # }
418/// # }
419/// #
420/// # impl ContextExt for MyCustomRuntime {
421/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
422/// # where
423/// # F: Future<Output = R> + Send + 'static
424/// # {
425/// # unreachable!()
426/// # }
427/// # fn get_task_locals() -> Option<TaskLocals> {
428/// # unreachable!()
429/// # }
430/// # }
431/// #
432/// const PYTHON_CODE: &'static str = r#"
433/// import asyncio
434///
435/// async def py_sleep(duration):
436/// await asyncio.sleep(duration)
437/// "#;
438///
439/// async fn py_sleep(seconds: f32) -> PyResult<()> {
440/// let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
441/// Ok(
442/// PyModule::from_code(
443/// py,
444/// &CString::new(PYTHON_CODE).unwrap(),
445/// &CString::new("test_into_future/test_mod.py").unwrap(),
446/// &CString::new("test_mod").unwrap(),
447/// )?
448/// .into()
449/// )
450/// })?;
451///
452/// Python::attach(|py| {
453/// pyo3_async_runtimes::generic::into_future::<MyCustomRuntime>(
454/// test_mod
455/// .call_method1(py, "py_sleep", (seconds,))?
456/// .into_bound(py),
457/// )
458/// })?
459/// .await?;
460/// Ok(())
461/// }
462/// ```
463pub fn into_future<R>(
464 awaitable: Bound<PyAny>,
465) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send>
466where
467 R: Runtime + ContextExt,
468{
469 into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
470}
471
472/// Convert a Rust Future into a Python awaitable with a generic runtime
473///
474/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
475/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
476///
477/// Python `contextvars` are preserved when calling async Python functions within the Rust future
478/// via [`into_future`] (new behaviour in `v0.15`).
479///
480/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
481/// > unfortunately fail to resolve them when called within the Rust future. This is because the
482/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
483/// >
484/// > As a workaround, you can get the `contextvars` from the current task locals using
485/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
486/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
487/// > synchronous function, and restore the previous context when it returns or raises an exception.
488///
489/// # Arguments
490/// * `py` - PyO3 GIL guard
491/// * `locals` - The task-local data for Python
492/// * `fut` - The Rust future to be converted
493///
494/// # Examples
495///
496/// ```no_run
497/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
498/// #
499/// # use pyo3_async_runtimes::{
500/// # TaskLocals,
501/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
502/// # };
503/// #
504/// # struct MyCustomJoinError;
505/// #
506/// # impl JoinError for MyCustomJoinError {
507/// # fn is_panic(&self) -> bool {
508/// # unreachable!()
509/// # }
510/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
511/// # unreachable!()
512/// # }
513/// # }
514/// #
515/// # struct MyCustomJoinHandle;
516/// #
517/// # impl Future for MyCustomJoinHandle {
518/// # type Output = Result<(), MyCustomJoinError>;
519/// #
520/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
521/// # unreachable!()
522/// # }
523/// # }
524/// #
525/// # struct MyCustomRuntime;
526/// #
527/// # impl MyCustomRuntime {
528/// # async fn sleep(_: Duration) {
529/// # unreachable!()
530/// # }
531/// # }
532/// #
533/// # impl Runtime for MyCustomRuntime {
534/// # type JoinError = MyCustomJoinError;
535/// # type JoinHandle = MyCustomJoinHandle;
536/// #
537/// # fn spawn<F>(fut: F) -> Self::JoinHandle
538/// # where
539/// # F: Future<Output = ()> + Send + 'static
540/// # {
541/// # unreachable!()
542/// # }
543/// # }
544/// #
545/// # impl ContextExt for MyCustomRuntime {
546/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
547/// # where
548/// # F: Future<Output = R> + Send + 'static
549/// # {
550/// # unreachable!()
551/// # }
552/// # fn get_task_locals() -> Option<TaskLocals> {
553/// # unreachable!()
554/// # }
555/// # }
556/// #
557/// use std::time::Duration;
558///
559/// use pyo3::prelude::*;
560///
561/// /// Awaitable sleep function
562/// #[pyfunction]
563/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
564/// let secs = secs.extract()?;
565/// pyo3_async_runtimes::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
566/// py,
567/// pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
568/// async move {
569/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
570/// Ok(())
571/// }
572/// )
573/// }
574/// ```
575#[allow(unused_must_use)]
576pub fn future_into_py_with_locals<R, F, T>(
577 py: Python,
578 locals: TaskLocals,
579 fut: F,
580) -> PyResult<Bound<PyAny>>
581where
582 R: Runtime + ContextExt,
583 F: Future<Output = PyResult<T>> + Send + 'static,
584 T: for<'py> IntoPyObject<'py>,
585{
586 let (cancel_tx, cancel_rx) = oneshot::channel();
587
588 let py_fut = create_future(locals.event_loop.bind(py).clone())?;
589 py_fut.call_method1(
590 "add_done_callback",
591 (PyDoneCallback {
592 cancel_tx: Some(cancel_tx),
593 },),
594 )?;
595
596 let future_tx1: Py<PyAny> = py_fut.clone().into();
597 let future_tx2 = future_tx1.clone_ref(py);
598
599 R::spawn(async move {
600 let locals2 = Python::attach(|py| locals.clone_ref(py));
601
602 if let Err(e) = R::spawn(async move {
603 let result = R::scope(
604 Python::attach(|py| locals2.clone_ref(py)),
605 Cancellable::new_with_cancel_rx(fut, cancel_rx),
606 )
607 .await;
608
609 Python::attach(move |py| {
610 if cancelled(future_tx1.bind(py))
611 .map_err(dump_err(py))
612 .unwrap_or(false)
613 {
614 return;
615 }
616
617 let _ = set_result(
618 &locals2.event_loop(py),
619 future_tx1.bind(py),
620 result.and_then(|val| val.into_py_any(py)),
621 )
622 .map_err(dump_err(py));
623 });
624 })
625 .await
626 {
627 if e.is_panic() {
628 Python::attach(move |py| {
629 if cancelled(future_tx2.bind(py))
630 .map_err(dump_err(py))
631 .unwrap_or(false)
632 {
633 return;
634 }
635
636 let panic_message = format!(
637 "rust future panicked: {}",
638 get_panic_message(&e.into_panic())
639 );
640 let _ = set_result(
641 locals.event_loop.bind(py),
642 future_tx2.bind(py),
643 Err(RustPanic::new_err(panic_message)),
644 )
645 .map_err(dump_err(py));
646 });
647 }
648 }
649 });
650
651 Ok(py_fut)
652}
653
654fn get_panic_message(any: &dyn std::any::Any) -> &str {
655 if let Some(str_slice) = any.downcast_ref::<&str>() {
656 str_slice
657 } else if let Some(string) = any.downcast_ref::<String>() {
658 string.as_str()
659 } else {
660 "unknown error"
661 }
662}
663
664pin_project! {
665 /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
666 #[must_use = "futures do nothing unless you `.await` or poll them"]
667 #[derive(Debug)]
668 struct Cancellable<T> {
669 #[pin]
670 future: T,
671 #[pin]
672 cancel_rx: oneshot::Receiver<()>,
673
674 poll_cancel_rx: bool
675 }
676}
677
678impl<T> Cancellable<T> {
679 fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
680 Self {
681 future,
682 cancel_rx,
683
684 poll_cancel_rx: true,
685 }
686 }
687}
688
689impl<'py, F, T> Future for Cancellable<F>
690where
691 F: Future<Output = PyResult<T>>,
692 T: IntoPyObject<'py>,
693{
694 type Output = F::Output;
695
696 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
697 let this = self.project();
698
699 // First, try polling the future
700 if let Poll::Ready(v) = this.future.poll(cx) {
701 return Poll::Ready(v);
702 }
703
704 // Now check for cancellation
705 if *this.poll_cancel_rx {
706 match this.cancel_rx.poll(cx) {
707 Poll::Ready(Ok(())) => {
708 *this.poll_cancel_rx = false;
709 // The python future has already been cancelled, so this return value will never
710 // be used.
711 Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
712 "unreachable",
713 )))
714 }
715 Poll::Ready(Err(_)) => {
716 *this.poll_cancel_rx = false;
717 Poll::Pending
718 }
719 Poll::Pending => Poll::Pending,
720 }
721 } else {
722 Poll::Pending
723 }
724 }
725}
726
727#[pyclass]
728struct PyDoneCallback {
729 cancel_tx: Option<oneshot::Sender<()>>,
730}
731
732#[pymethods]
733impl PyDoneCallback {
734 pub fn __call__(&mut self, fut: &Bound<PyAny>) -> PyResult<()> {
735 let py = fut.py();
736
737 if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
738 let _ = self.cancel_tx.take().unwrap().send(());
739 }
740
741 Ok(())
742 }
743}
744
745/// Convert a Rust Future into a Python awaitable with a generic runtime
746///
747/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
748/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
749///
750/// Python `contextvars` are preserved when calling async Python functions within the Rust future
751/// via [`into_future`] (new behaviour in `v0.15`).
752///
753/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
754/// > unfortunately fail to resolve them when called within the Rust future. This is because the
755/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
756/// >
757/// > As a workaround, you can get the `contextvars` from the current task locals using
758/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
759/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
760/// > synchronous function, and restore the previous context when it returns or raises an exception.
761///
762/// # Arguments
763/// * `py` - The current PyO3 GIL guard
764/// * `fut` - The Rust future to be converted
765///
766/// # Examples
767///
768/// ```no_run
769/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
770/// #
771/// # use pyo3_async_runtimes::{
772/// # TaskLocals,
773/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
774/// # };
775/// #
776/// # struct MyCustomJoinError;
777/// #
778/// # impl JoinError for MyCustomJoinError {
779/// # fn is_panic(&self) -> bool {
780/// # unreachable!()
781/// # }
782/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
783/// # unreachable!()
784/// # }
785/// # }
786/// #
787/// # struct MyCustomJoinHandle;
788/// #
789/// # impl Future for MyCustomJoinHandle {
790/// # type Output = Result<(), MyCustomJoinError>;
791/// #
792/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
793/// # unreachable!()
794/// # }
795/// # }
796/// #
797/// # struct MyCustomRuntime;
798/// #
799/// # impl MyCustomRuntime {
800/// # async fn sleep(_: Duration) {
801/// # unreachable!()
802/// # }
803/// # }
804/// #
805/// # impl Runtime for MyCustomRuntime {
806/// # type JoinError = MyCustomJoinError;
807/// # type JoinHandle = MyCustomJoinHandle;
808/// #
809/// # fn spawn<F>(fut: F) -> Self::JoinHandle
810/// # where
811/// # F: Future<Output = ()> + Send + 'static
812/// # {
813/// # unreachable!()
814/// # }
815/// # }
816/// #
817/// # impl ContextExt for MyCustomRuntime {
818/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
819/// # where
820/// # F: Future<Output = R> + Send + 'static
821/// # {
822/// # unreachable!()
823/// # }
824/// # fn get_task_locals() -> Option<TaskLocals> {
825/// # unreachable!()
826/// # }
827/// # }
828/// #
829/// use std::time::Duration;
830///
831/// use pyo3::prelude::*;
832///
833/// /// Awaitable sleep function
834/// #[pyfunction]
835/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
836/// let secs = secs.extract()?;
837/// pyo3_async_runtimes::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
838/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
839/// Ok(())
840/// })
841/// }
842/// ```
843pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
844where
845 R: Runtime + ContextExt,
846 F: Future<Output = PyResult<T>> + Send + 'static,
847 T: for<'py> IntoPyObject<'py>,
848{
849 future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
850}
851
852/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
853/// specification of task locals.
854///
855/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
856/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
857///
858/// Python `contextvars` are preserved when calling async Python functions within the Rust future
859/// via [`into_future`] (new behaviour in `v0.15`).
860///
861/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
862/// > unfortunately fail to resolve them when called within the Rust future. This is because the
863/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
864/// >
865/// > As a workaround, you can get the `contextvars` from the current task locals using
866/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
867/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
868/// > synchronous function, and restore the previous context when it returns or raises an exception.
869///
870/// # Arguments
871/// * `py` - PyO3 GIL guard
872/// * `locals` - The task locals for the future
873/// * `fut` - The Rust future to be converted
874///
875/// # Examples
876///
877/// ```no_run
878/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
879/// #
880/// # use pyo3_async_runtimes::{
881/// # TaskLocals,
882/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
883/// # };
884/// #
885/// # struct MyCustomJoinError;
886/// #
887/// # impl JoinError for MyCustomJoinError {
888/// # fn is_panic(&self) -> bool {
889/// # unreachable!()
890/// # }
891/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
892/// # unreachable!()
893/// # }
894/// # }
895/// #
896/// # struct MyCustomJoinHandle;
897/// #
898/// # impl Future for MyCustomJoinHandle {
899/// # type Output = Result<(), MyCustomJoinError>;
900/// #
901/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
902/// # unreachable!()
903/// # }
904/// # }
905/// #
906/// # struct MyCustomRuntime;
907/// #
908/// # impl MyCustomRuntime {
909/// # async fn sleep(_: Duration) {
910/// # unreachable!()
911/// # }
912/// # }
913/// #
914/// # impl Runtime for MyCustomRuntime {
915/// # type JoinError = MyCustomJoinError;
916/// # type JoinHandle = MyCustomJoinHandle;
917/// #
918/// # fn spawn<F>(fut: F) -> Self::JoinHandle
919/// # where
920/// # F: Future<Output = ()> + Send + 'static
921/// # {
922/// # unreachable!()
923/// # }
924/// # }
925/// #
926/// # impl ContextExt for MyCustomRuntime {
927/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
928/// # where
929/// # F: Future<Output = R> + Send + 'static
930/// # {
931/// # unreachable!()
932/// # }
933/// # fn get_task_locals() -> Option<TaskLocals> {
934/// # unreachable!()
935/// # }
936/// # }
937/// #
938/// # impl SpawnLocalExt for MyCustomRuntime {
939/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
940/// # where
941/// # F: Future<Output = ()> + 'static
942/// # {
943/// # unreachable!()
944/// # }
945/// # }
946/// #
947/// # impl LocalContextExt for MyCustomRuntime {
948/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
949/// # where
950/// # F: Future<Output = R> + 'static
951/// # {
952/// # unreachable!()
953/// # }
954/// # }
955/// #
956/// use std::{rc::Rc, time::Duration};
957///
958/// use pyo3::prelude::*;
959///
960/// /// Awaitable sleep function
961/// #[pyfunction]
962/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
963/// // Rc is !Send so it cannot be passed into pyo3_async_runtimes::generic::future_into_py
964/// let secs = Rc::new(secs);
965///
966/// pyo3_async_runtimes::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
967/// py,
968/// pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
969/// async move {
970/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
971/// Ok(())
972/// }
973/// )
974/// }
975/// ```
976#[deprecated(
977 since = "0.18.0",
978 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
979)]
980#[allow(unused_must_use)]
981pub fn local_future_into_py_with_locals<R, F, T>(
982 py: Python,
983 locals: TaskLocals,
984 fut: F,
985) -> PyResult<Bound<PyAny>>
986where
987 R: Runtime + SpawnLocalExt + LocalContextExt,
988 F: Future<Output = PyResult<T>> + 'static,
989 T: for<'py> IntoPyObject<'py>,
990{
991 let (cancel_tx, cancel_rx) = oneshot::channel();
992
993 let py_fut = create_future(locals.event_loop.clone_ref(py).into_bound(py))?;
994 py_fut.call_method1(
995 "add_done_callback",
996 (PyDoneCallback {
997 cancel_tx: Some(cancel_tx),
998 },),
999 )?;
1000
1001 let future_tx1: Py<PyAny> = py_fut.clone().into();
1002 let future_tx2 = future_tx1.clone_ref(py);
1003
1004 R::spawn_local(async move {
1005 let locals2 = Python::attach(|py| locals.clone_ref(py));
1006
1007 if let Err(e) = R::spawn_local(async move {
1008 let result = R::scope_local(
1009 Python::attach(|py| locals2.clone_ref(py)),
1010 Cancellable::new_with_cancel_rx(fut, cancel_rx),
1011 )
1012 .await;
1013
1014 Python::attach(move |py| {
1015 if cancelled(future_tx1.bind(py))
1016 .map_err(dump_err(py))
1017 .unwrap_or(false)
1018 {
1019 return;
1020 }
1021
1022 let _ = set_result(
1023 locals2.event_loop.bind(py),
1024 future_tx1.bind(py),
1025 result.and_then(|val| val.into_py_any(py)),
1026 )
1027 .map_err(dump_err(py));
1028 });
1029 })
1030 .await
1031 {
1032 if e.is_panic() {
1033 Python::attach(move |py| {
1034 if cancelled(future_tx2.bind(py))
1035 .map_err(dump_err(py))
1036 .unwrap_or(false)
1037 {
1038 return;
1039 }
1040
1041 let panic_message = format!(
1042 "rust future panicked: {}",
1043 get_panic_message(&e.into_panic())
1044 );
1045 let _ = set_result(
1046 locals.event_loop.bind(py),
1047 future_tx2.bind(py),
1048 Err(RustPanic::new_err(panic_message)),
1049 )
1050 .map_err(dump_err(py));
1051 });
1052 }
1053 }
1054 });
1055
1056 Ok(py_fut)
1057}
1058
1059/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1060///
1061/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1062/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1063///
1064/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1065/// via [`into_future`] (new behaviour in `v0.15`).
1066///
1067/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1068/// > unfortunately fail to resolve them when called within the Rust future. This is because the
1069/// > function is being called from a Rust thread, not inside an actual Python coroutine context.
1070/// >
1071/// > As a workaround, you can get the `contextvars` from the current task locals using
1072/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1073/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1074/// > synchronous function, and restore the previous context when it returns or raises an exception.
1075///
1076/// # Arguments
1077/// * `py` - The current PyO3 GIL guard
1078/// * `fut` - The Rust future to be converted
1079///
1080/// # Examples
1081///
1082/// ```no_run
1083/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1084/// #
1085/// # use pyo3_async_runtimes::{
1086/// # TaskLocals,
1087/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1088/// # };
1089/// #
1090/// # struct MyCustomJoinError;
1091/// #
1092/// # impl JoinError for MyCustomJoinError {
1093/// # fn is_panic(&self) -> bool {
1094/// # unreachable!()
1095/// # }
1096/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1097/// # unreachable!()
1098/// # }
1099/// # }
1100/// #
1101/// # struct MyCustomJoinHandle;
1102/// #
1103/// # impl Future for MyCustomJoinHandle {
1104/// # type Output = Result<(), MyCustomJoinError>;
1105/// #
1106/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1107/// # unreachable!()
1108/// # }
1109/// # }
1110/// #
1111/// # struct MyCustomRuntime;
1112/// #
1113/// # impl MyCustomRuntime {
1114/// # async fn sleep(_: Duration) {
1115/// # unreachable!()
1116/// # }
1117/// # }
1118/// #
1119/// # impl Runtime for MyCustomRuntime {
1120/// # type JoinError = MyCustomJoinError;
1121/// # type JoinHandle = MyCustomJoinHandle;
1122/// #
1123/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1124/// # where
1125/// # F: Future<Output = ()> + Send + 'static
1126/// # {
1127/// # unreachable!()
1128/// # }
1129/// # }
1130/// #
1131/// # impl ContextExt for MyCustomRuntime {
1132/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1133/// # where
1134/// # F: Future<Output = R> + Send + 'static
1135/// # {
1136/// # unreachable!()
1137/// # }
1138/// # fn get_task_locals() -> Option<TaskLocals> {
1139/// # unreachable!()
1140/// # }
1141/// # }
1142/// #
1143/// # impl SpawnLocalExt for MyCustomRuntime {
1144/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
1145/// # where
1146/// # F: Future<Output = ()> + 'static
1147/// # {
1148/// # unreachable!()
1149/// # }
1150/// # }
1151/// #
1152/// # impl LocalContextExt for MyCustomRuntime {
1153/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1154/// # where
1155/// # F: Future<Output = R> + 'static
1156/// # {
1157/// # unreachable!()
1158/// # }
1159/// # }
1160/// #
1161/// use std::{rc::Rc, time::Duration};
1162///
1163/// use pyo3::prelude::*;
1164///
1165/// /// Awaitable sleep function
1166/// #[pyfunction]
1167/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
1168/// // Rc is !Send so it cannot be passed into pyo3_async_runtimes::generic::future_into_py
1169/// let secs = Rc::new(secs);
1170///
1171/// pyo3_async_runtimes::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1172/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1173/// Ok(())
1174/// })
1175/// }
1176/// ```
1177#[deprecated(
1178 since = "0.18.0",
1179 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1180)]
1181#[allow(deprecated)]
1182pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
1183where
1184 R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1185 F: Future<Output = PyResult<T>> + 'static,
1186 T: for<'py> IntoPyObject<'py>,
1187{
1188 local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1189}
1190
1191/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1192///
1193/// **This API is marked as unstable** and is only available when the
1194/// `unstable-streams` crate feature is enabled. This comes with no
1195/// stability guarantees, and could be changed or removed at any time.
1196///
1197/// # Arguments
1198/// * `locals` - The current task locals
1199/// * `gen` - The Python async generator to be converted
1200///
1201/// # Examples
1202/// ```no_run
1203/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1204/// #
1205/// # use pyo3_async_runtimes::{
1206/// # TaskLocals,
1207/// # generic::{JoinError, ContextExt, Runtime}
1208/// # };
1209/// #
1210/// # struct MyCustomJoinError;
1211/// #
1212/// # impl JoinError for MyCustomJoinError {
1213/// # fn is_panic(&self) -> bool {
1214/// # unreachable!()
1215/// # }
1216/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1217/// # unreachable!()
1218/// # }
1219/// # }
1220/// #
1221/// # struct MyCustomJoinHandle;
1222/// #
1223/// # impl Future for MyCustomJoinHandle {
1224/// # type Output = Result<(), MyCustomJoinError>;
1225/// #
1226/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1227/// # unreachable!()
1228/// # }
1229/// # }
1230/// #
1231/// # struct MyCustomRuntime;
1232/// #
1233/// # impl Runtime for MyCustomRuntime {
1234/// # type JoinError = MyCustomJoinError;
1235/// # type JoinHandle = MyCustomJoinHandle;
1236/// #
1237/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1238/// # where
1239/// # F: Future<Output = ()> + Send + 'static
1240/// # {
1241/// # unreachable!()
1242/// # }
1243/// # }
1244/// #
1245/// # impl ContextExt for MyCustomRuntime {
1246/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1247/// # where
1248/// # F: Future<Output = R> + Send + 'static
1249/// # {
1250/// # unreachable!()
1251/// # }
1252/// # fn get_task_locals() -> Option<TaskLocals> {
1253/// # unreachable!()
1254/// # }
1255/// # }
1256///
1257/// use pyo3::prelude::*;
1258/// use futures::{StreamExt, TryStreamExt};
1259/// use std::ffi::CString;
1260///
1261/// const TEST_MOD: &str = r#"
1262/// import asyncio
1263///
1264/// async def gen():
1265/// for i in range(10):
1266/// await asyncio.sleep(0.1)
1267/// yield i
1268/// "#;
1269///
1270/// # async fn test_async_gen() -> PyResult<()> {
1271/// let stream = Python::attach(|py| {
1272/// let test_mod = PyModule::from_code(
1273/// py,
1274/// &CString::new(TEST_MOD).unwrap(),
1275/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1276/// &CString::new("test_mod").unwrap(),
1277/// )?;
1278///
1279/// pyo3_async_runtimes::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1280/// pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
1281/// test_mod.call_method0("gen")?
1282/// )
1283/// })?;
1284///
1285/// let vals = stream
1286/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1287/// .try_collect::<Vec<i32>>()
1288/// .await?;
1289///
1290/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1291///
1292/// Ok(())
1293/// # }
1294/// ```
1295#[cfg(feature = "unstable-streams")]
1296#[allow(unused_must_use)] // False positive unused lint on `R::spawn`
1297pub fn into_stream_with_locals_v1<R>(
1298 locals: TaskLocals,
1299 gen: Bound<'_, PyAny>,
1300) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
1301where
1302 R: Runtime,
1303{
1304 let (tx, rx) = async_channel::bounded(1);
1305 let anext: Py<PyAny> = gen.getattr("__anext__")?.into();
1306
1307 R::spawn(async move {
1308 loop {
1309 let fut = Python::attach(|py| -> PyResult<_> {
1310 into_future_with_locals(&locals, anext.bind(py).call0()?)
1311 });
1312 let item = match fut {
1313 Ok(fut) => match fut.await {
1314 Ok(item) => Ok(item),
1315 Err(e) => {
1316 let stop_iter = Python::attach(|py| {
1317 e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1318 });
1319
1320 if stop_iter {
1321 // end the iteration
1322 break;
1323 } else {
1324 Err(e)
1325 }
1326 }
1327 },
1328 Err(e) => Err(e),
1329 };
1330
1331 if tx.send(item).await.is_err() {
1332 // receiving side was dropped
1333 break;
1334 }
1335 }
1336 });
1337
1338 Ok(rx)
1339}
1340
1341/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1342///
1343/// **This API is marked as unstable** and is only available when the
1344/// `unstable-streams` crate feature is enabled. This comes with no
1345/// stability guarantees, and could be changed or removed at any time.
1346///
1347/// # Arguments
1348/// * `gen` - The Python async generator to be converted
1349///
1350/// # Examples
1351/// ```no_run
1352/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1353/// #
1354/// # use pyo3_async_runtimes::{
1355/// # TaskLocals,
1356/// # generic::{JoinError, ContextExt, Runtime}
1357/// # };
1358/// #
1359/// # struct MyCustomJoinError;
1360/// #
1361/// # impl JoinError for MyCustomJoinError {
1362/// # fn is_panic(&self) -> bool {
1363/// # unreachable!()
1364/// # }
1365/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1366/// # unreachable!()
1367/// # }
1368/// # }
1369/// #
1370/// # struct MyCustomJoinHandle;
1371/// #
1372/// # impl Future for MyCustomJoinHandle {
1373/// # type Output = Result<(), MyCustomJoinError>;
1374/// #
1375/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1376/// # unreachable!()
1377/// # }
1378/// # }
1379/// #
1380/// # struct MyCustomRuntime;
1381/// #
1382/// # impl Runtime for MyCustomRuntime {
1383/// # type JoinError = MyCustomJoinError;
1384/// # type JoinHandle = MyCustomJoinHandle;
1385/// #
1386/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1387/// # where
1388/// # F: Future<Output = ()> + Send + 'static
1389/// # {
1390/// # unreachable!()
1391/// # }
1392/// # }
1393/// #
1394/// # impl ContextExt for MyCustomRuntime {
1395/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1396/// # where
1397/// # F: Future<Output = R> + Send + 'static
1398/// # {
1399/// # unreachable!()
1400/// # }
1401/// # fn get_task_locals() -> Option<TaskLocals> {
1402/// # unreachable!()
1403/// # }
1404/// # }
1405///
1406/// use pyo3::prelude::*;
1407/// use futures::{StreamExt, TryStreamExt};
1408/// use std::ffi::CString;
1409///
1410/// const TEST_MOD: &str = r#"
1411/// import asyncio
1412///
1413/// async def gen():
1414/// for i in range(10):
1415/// await asyncio.sleep(0.1)
1416/// yield i
1417/// "#;
1418///
1419/// # async fn test_async_gen() -> PyResult<()> {
1420/// let stream = Python::attach(|py| {
1421/// let test_mod = PyModule::from_code(
1422/// py,
1423/// &CString::new(TEST_MOD).unwrap(),
1424/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1425/// &CString::new("test_mod").unwrap(),
1426/// )?;
1427///
1428/// pyo3_async_runtimes::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1429/// })?;
1430///
1431/// let vals = stream
1432/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1433/// .try_collect::<Vec<i32>>()
1434/// .await?;
1435///
1436/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1437///
1438/// Ok(())
1439/// # }
1440/// ```
1441#[cfg(feature = "unstable-streams")]
1442pub fn into_stream_v1<R>(
1443 gen: Bound<'_, PyAny>,
1444) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
1445where
1446 R: Runtime + ContextExt,
1447{
1448 into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1449}
1450
1451trait Sender: Send + 'static {
1452 fn send(&mut self, py: Python, locals: TaskLocals, item: Py<PyAny>) -> PyResult<Py<PyAny>>;
1453 fn close(&mut self) -> PyResult<()>;
1454}
1455
1456#[cfg(feature = "unstable-streams")]
1457struct GenericSender<R>
1458where
1459 R: Runtime,
1460{
1461 runtime: PhantomData<R>,
1462 tx: mpsc::Sender<Py<PyAny>>,
1463}
1464
1465#[cfg(feature = "unstable-streams")]
1466impl<R> Sender for GenericSender<R>
1467where
1468 R: Runtime + ContextExt,
1469{
1470 fn send(&mut self, py: Python, locals: TaskLocals, item: Py<PyAny>) -> PyResult<Py<PyAny>> {
1471 match self.tx.try_send(item.clone_ref(py)) {
1472 Ok(_) => true.into_py_any(py),
1473 Err(e) => {
1474 if e.is_full() {
1475 let mut tx = self.tx.clone();
1476
1477 future_into_py_with_locals::<R, _, bool>(py, locals, async move {
1478 if tx.flush().await.is_err() {
1479 // receiving side disconnected
1480 return Ok(false);
1481 }
1482 if tx.send(item).await.is_err() {
1483 // receiving side disconnected
1484 return Ok(false);
1485 }
1486 Ok(true)
1487 })
1488 .map(Bound::unbind)
1489 } else {
1490 false.into_py_any(py)
1491 }
1492 }
1493 }
1494 }
1495 fn close(&mut self) -> PyResult<()> {
1496 self.tx.close_channel();
1497 Ok(())
1498 }
1499}
1500
1501#[pyclass]
1502struct SenderGlue {
1503 locals: TaskLocals,
1504 tx: Arc<Mutex<dyn Sender>>,
1505}
1506#[pymethods]
1507impl SenderGlue {
1508 pub fn send(&mut self, item: Py<PyAny>) -> PyResult<Py<PyAny>> {
1509 Python::attach(|py| {
1510 self.tx
1511 .lock()
1512 .unwrap()
1513 .send(py, self.locals.clone_ref(py), item)
1514 })
1515 }
1516 pub fn close(&mut self) -> PyResult<()> {
1517 self.tx.lock().unwrap().close()
1518 }
1519}
1520
1521#[cfg(feature = "unstable-streams")]
1522const STREAM_GLUE: &str = r#"
1523import asyncio
1524
1525async def forward(gen, sender):
1526 async for item in gen:
1527 should_continue = sender.send(item)
1528
1529 if asyncio.iscoroutine(should_continue):
1530 should_continue = await should_continue
1531
1532 if should_continue:
1533 continue
1534 else:
1535 break
1536
1537 sender.close()
1538"#;
1539
1540/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1541///
1542/// **This API is marked as unstable** and is only available when the
1543/// `unstable-streams` crate feature is enabled. This comes with no
1544/// stability guarantees, and could be changed or removed at any time.
1545///
1546/// # Arguments
1547/// * `locals` - The current task locals
1548/// * `gen` - The Python async generator to be converted
1549///
1550/// # Examples
1551/// ```no_run
1552/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1553/// #
1554/// # use pyo3_async_runtimes::{
1555/// # TaskLocals,
1556/// # generic::{JoinError, ContextExt, Runtime}
1557/// # };
1558/// #
1559/// # struct MyCustomJoinError;
1560/// #
1561/// # impl JoinError for MyCustomJoinError {
1562/// # fn is_panic(&self) -> bool {
1563/// # unreachable!()
1564/// # }
1565/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1566/// # unreachable!()
1567/// # }
1568/// # }
1569/// #
1570/// # struct MyCustomJoinHandle;
1571/// #
1572/// # impl Future for MyCustomJoinHandle {
1573/// # type Output = Result<(), MyCustomJoinError>;
1574/// #
1575/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1576/// # unreachable!()
1577/// # }
1578/// # }
1579/// #
1580/// # struct MyCustomRuntime;
1581/// #
1582/// # impl Runtime for MyCustomRuntime {
1583/// # type JoinError = MyCustomJoinError;
1584/// # type JoinHandle = MyCustomJoinHandle;
1585/// #
1586/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1587/// # where
1588/// # F: Future<Output = ()> + Send + 'static
1589/// # {
1590/// # unreachable!()
1591/// # }
1592/// # }
1593/// #
1594/// # impl ContextExt for MyCustomRuntime {
1595/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1596/// # where
1597/// # F: Future<Output = R> + Send + 'static
1598/// # {
1599/// # unreachable!()
1600/// # }
1601/// # fn get_task_locals() -> Option<TaskLocals> {
1602/// # unreachable!()
1603/// # }
1604/// # }
1605///
1606/// use pyo3::prelude::*;
1607/// use futures::{StreamExt, TryStreamExt};
1608/// use std::ffi::CString;
1609///
1610/// const TEST_MOD: &str = r#"
1611/// import asyncio
1612///
1613/// async def gen():
1614/// for i in range(10):
1615/// await asyncio.sleep(0.1)
1616/// yield i
1617/// "#;
1618///
1619/// # async fn test_async_gen() -> PyResult<()> {
1620/// let stream = Python::attach(|py| {
1621/// let test_mod = PyModule::from_code(
1622/// py,
1623/// &CString::new(TEST_MOD).unwrap(),
1624/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1625/// &CString::new("test_mod").unwrap(),
1626/// )?;
1627///
1628/// pyo3_async_runtimes::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1629/// pyo3_async_runtimes::generic::get_current_locals::<MyCustomRuntime>(py)?,
1630/// test_mod.call_method0("gen")?
1631/// )
1632/// })?;
1633///
1634/// let vals = stream
1635/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1636/// .try_collect::<Vec<i32>>()
1637/// .await?;
1638///
1639/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1640///
1641/// Ok(())
1642/// # }
1643/// ```
1644#[cfg(feature = "unstable-streams")]
1645pub fn into_stream_with_locals_v2<R>(
1646 locals: TaskLocals,
1647 gen: Bound<'_, PyAny>,
1648) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
1649where
1650 R: Runtime + ContextExt,
1651{
1652 use std::ffi::CString;
1653
1654 use pyo3::sync::PyOnceLock;
1655
1656 static GLUE_MOD: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
1657 let py = gen.py();
1658 let glue = GLUE_MOD
1659 .get_or_try_init(py, || -> PyResult<Py<PyAny>> {
1660 Ok(PyModule::from_code(
1661 py,
1662 &CString::new(STREAM_GLUE).unwrap(),
1663 &CString::new("pyo3_async_runtimes/pyo3_async_runtimes_glue.py").unwrap(),
1664 &CString::new("pyo3_async_runtimes_glue").unwrap(),
1665 )?
1666 .into())
1667 })?
1668 .bind(py);
1669
1670 let (tx, rx) = mpsc::channel(10);
1671
1672 locals.event_loop(py).call_method1(
1673 "call_soon_threadsafe",
1674 (
1675 locals.event_loop(py).getattr("create_task")?,
1676 glue.call_method1(
1677 "forward",
1678 (
1679 gen,
1680 SenderGlue {
1681 locals,
1682 tx: Arc::new(Mutex::new(GenericSender {
1683 runtime: PhantomData::<R>,
1684 tx,
1685 })),
1686 },
1687 ),
1688 )?,
1689 ),
1690 )?;
1691 Ok(rx)
1692}
1693
1694/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1695///
1696/// **This API is marked as unstable** and is only available when the
1697/// `unstable-streams` crate feature is enabled. This comes with no
1698/// stability guarantees, and could be changed or removed at any time.
1699///
1700/// # Arguments
1701/// * `gen` - The Python async generator to be converted
1702///
1703/// # Examples
1704/// ```no_run
1705/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1706/// #
1707/// # use pyo3_async_runtimes::{
1708/// # TaskLocals,
1709/// # generic::{JoinError, ContextExt, Runtime}
1710/// # };
1711/// #
1712/// # struct MyCustomJoinError;
1713/// #
1714/// # impl JoinError for MyCustomJoinError {
1715/// # fn is_panic(&self) -> bool {
1716/// # unreachable!()
1717/// # }
1718/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1719/// # unreachable!()
1720/// # }
1721/// # }
1722/// #
1723/// # struct MyCustomJoinHandle;
1724/// #
1725/// # impl Future for MyCustomJoinHandle {
1726/// # type Output = Result<(), MyCustomJoinError>;
1727/// #
1728/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1729/// # unreachable!()
1730/// # }
1731/// # }
1732/// #
1733/// # struct MyCustomRuntime;
1734/// #
1735/// # impl Runtime for MyCustomRuntime {
1736/// # type JoinError = MyCustomJoinError;
1737/// # type JoinHandle = MyCustomJoinHandle;
1738/// #
1739/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1740/// # where
1741/// # F: Future<Output = ()> + Send + 'static
1742/// # {
1743/// # unreachable!()
1744/// # }
1745/// # }
1746/// #
1747/// # impl ContextExt for MyCustomRuntime {
1748/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1749/// # where
1750/// # F: Future<Output = R> + Send + 'static
1751/// # {
1752/// # unreachable!()
1753/// # }
1754/// # fn get_task_locals() -> Option<TaskLocals> {
1755/// # unreachable!()
1756/// # }
1757/// # }
1758///
1759/// use pyo3::prelude::*;
1760/// use futures::{StreamExt, TryStreamExt};
1761/// use std::ffi::CString;
1762///
1763/// const TEST_MOD: &str = r#"
1764/// import asyncio
1765///
1766/// async def gen():
1767/// for i in range(10):
1768/// await asyncio.sleep(0.1)
1769/// yield i
1770/// "#;
1771///
1772/// # async fn test_async_gen() -> PyResult<()> {
1773/// let stream = Python::attach(|py| {
1774/// let test_mod = PyModule::from_code(
1775/// py,
1776/// &CString::new(TEST_MOD).unwrap(),
1777/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(),
1778/// &CString::new("test_mod").unwrap(),
1779/// )?;
1780///
1781/// pyo3_async_runtimes::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1782/// })?;
1783///
1784/// let vals = stream
1785/// .map(|item| Python::attach(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1786/// .try_collect::<Vec<i32>>()
1787/// .await?;
1788///
1789/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1790///
1791/// Ok(())
1792/// # }
1793/// ```
1794#[cfg(feature = "unstable-streams")]
1795pub fn into_stream_v2<R>(
1796 gen: Bound<'_, PyAny>,
1797) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
1798where
1799 R: Runtime + ContextExt,
1800{
1801 into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1802}