pyo3_asyncio_0_21/generic.rs
1//! Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime
2//!
3//! Items marked with
4//! <span
5//! class="module-item stab portability"
6//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
7//! ><code>unstable-streams</code></span>
8//! are only available when the `unstable-streams` Cargo feature is enabled:
9//!
10//! ```toml
11//! [dependencies.pyo3-asyncio-0-21]
12//! version = "0.21"
13//! features = ["unstable-streams"]
14//! ```
15
16use std::{
17 future::Future,
18 marker::PhantomData,
19 pin::Pin,
20 sync::{Arc, Mutex},
21 task::{Context, Poll},
22};
23
24use futures::{
25 channel::{mpsc, oneshot},
26 SinkExt,
27};
28use once_cell::sync::OnceCell;
29use pin_project_lite::pin_project;
30use pyo3::prelude::*;
31
32use crate::{
33 asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
34 get_running_loop, into_future_with_locals, TaskLocals,
35};
36
37/// Generic utilities for a JoinError
38pub trait JoinError {
39 /// Check if the spawned task exited because of a panic
40 fn is_panic(&self) -> bool;
41 /// Get the panic object associated with the error. Panics if `is_panic` is not true.
42 fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static>;
43}
44
45/// Generic Rust async/await runtime
46pub trait Runtime: Send + 'static {
47 /// The error returned by a JoinHandle after being awaited
48 type JoinError: JoinError + Send;
49 /// A future that completes with the result of the spawned task
50 type JoinHandle: Future<Output = Result<(), Self::JoinError>> + Send;
51
52 /// Spawn a future onto this runtime's event loop
53 fn spawn<F>(fut: F) -> Self::JoinHandle
54 where
55 F: Future<Output = ()> + Send + 'static;
56}
57
58/// Extension trait for async/await runtimes that support spawning local tasks
59pub trait SpawnLocalExt: Runtime {
60 /// Spawn a !Send future onto this runtime's event loop
61 fn spawn_local<F>(fut: F) -> Self::JoinHandle
62 where
63 F: Future<Output = ()> + 'static;
64}
65
66/// Exposes the utilities necessary for using task-local data in the Runtime
67pub trait ContextExt: Runtime {
68 /// Set the task locals for the given future
69 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
70 where
71 F: Future<Output = R> + Send + 'static;
72
73 /// Get the task locals for the current task
74 fn get_task_locals() -> Option<TaskLocals>;
75}
76
77/// Adds the ability to scope task-local data for !Send futures
78pub trait LocalContextExt: Runtime {
79 /// Set the task locals for the given !Send future
80 fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
81 where
82 F: Future<Output = R> + 'static;
83}
84
85/// Get the current event loop from either Python or Rust async task local context
86///
87/// This function first checks if the runtime has a task-local reference to the Python event loop.
88/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated
89/// with the current OS thread.
90pub fn get_current_loop<R>(py: Python) -> PyResult<Bound<PyAny>>
91where
92 R: ContextExt,
93{
94 if let Some(locals) = R::get_task_locals() {
95 Ok(locals.event_loop.into_bound(py))
96 } else {
97 get_running_loop(py)
98 }
99}
100
101/// Either copy the task locals from the current task OR get the current running loop and
102/// contextvars from Python.
103pub fn get_current_locals<R>(py: Python) -> PyResult<TaskLocals>
104where
105 R: ContextExt,
106{
107 if let Some(locals) = R::get_task_locals() {
108 Ok(locals)
109 } else {
110 Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?)
111 }
112}
113
114/// Run the event loop until the given Future completes
115///
116/// After this function returns, the event loop can be resumed with [`run_until_complete`]
117///
118/// # Arguments
119/// * `event_loop` - The Python event loop that should run the future
120/// * `fut` - The future to drive to completion
121///
122/// # Examples
123///
124/// ```no_run
125/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
126/// #
127/// # use pyo3_asyncio_0_21::{
128/// # TaskLocals,
129/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
130/// # };
131/// #
132/// # struct MyCustomJoinError;
133/// #
134/// # impl JoinError for MyCustomJoinError {
135/// # fn is_panic(&self) -> bool {
136/// # unreachable!()
137/// # }
138/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
139/// # unreachable!()
140/// # }
141/// # }
142/// #
143/// # struct MyCustomJoinHandle;
144/// #
145/// # impl Future for MyCustomJoinHandle {
146/// # type Output = Result<(), MyCustomJoinError>;
147/// #
148/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
149/// # unreachable!()
150/// # }
151/// # }
152/// #
153/// # struct MyCustomRuntime;
154/// #
155/// # impl Runtime for MyCustomRuntime {
156/// # type JoinError = MyCustomJoinError;
157/// # type JoinHandle = MyCustomJoinHandle;
158/// #
159/// # fn spawn<F>(fut: F) -> Self::JoinHandle
160/// # where
161/// # F: Future<Output = ()> + Send + 'static
162/// # {
163/// # unreachable!()
164/// # }
165/// # }
166/// #
167/// # impl ContextExt for MyCustomRuntime {
168/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
169/// # where
170/// # F: Future<Output = R> + Send + 'static
171/// # {
172/// # unreachable!()
173/// # }
174/// # fn get_task_locals() -> Option<TaskLocals> {
175/// # unreachable!()
176/// # }
177/// # }
178/// #
179/// # use std::time::Duration;
180/// #
181/// # use pyo3::prelude::*;
182/// #
183/// # Python::with_gil(|py| -> PyResult<()> {
184/// # let event_loop = py.import_bound("asyncio")?.call_method0("new_event_loop")?;
185/// # #[cfg(feature = "tokio-runtime")]
186/// pyo3_asyncio_0_21::generic::run_until_complete::<MyCustomRuntime, _, _>(&event_loop, async move {
187/// tokio::time::sleep(Duration::from_secs(1)).await;
188/// Ok(())
189/// })?;
190/// # Ok(())
191/// # }).unwrap();
192/// ```
193pub fn run_until_complete<R, F, T>(event_loop: &Bound<PyAny>, fut: F) -> PyResult<T>
194where
195 R: Runtime + ContextExt,
196 F: Future<Output = PyResult<T>> + Send + 'static,
197 T: Send + Sync + 'static,
198{
199 let py = event_loop.py();
200 let result_tx = Arc::new(Mutex::new(None));
201 let result_rx = Arc::clone(&result_tx);
202 let coro = future_into_py_with_locals::<R, _, ()>(
203 py,
204 TaskLocals::new(event_loop.clone()).copy_context(py)?,
205 async move {
206 let val = fut.await?;
207 if let Ok(mut result) = result_tx.lock() {
208 *result = Some(val);
209 }
210 Ok(())
211 },
212 )?;
213
214 event_loop.call_method1("run_until_complete", (coro,))?;
215
216 let result = result_rx.lock().unwrap().take().unwrap();
217 Ok(result)
218}
219
220/// Run the event loop until the given Future completes
221///
222/// # Arguments
223/// * `py` - The current PyO3 GIL guard
224/// * `fut` - The future to drive to completion
225///
226/// # Examples
227///
228/// ```no_run
229/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
230/// #
231/// # use pyo3_asyncio_0_21::{
232/// # TaskLocals,
233/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
234/// # };
235/// #
236/// # struct MyCustomJoinError;
237/// #
238/// # impl JoinError for MyCustomJoinError {
239/// # fn is_panic(&self) -> bool {
240/// # unreachable!()
241/// # }
242/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
243/// # unreachable!()
244/// # }
245/// # }
246/// #
247/// # struct MyCustomJoinHandle;
248/// #
249/// # impl Future for MyCustomJoinHandle {
250/// # type Output = Result<(), MyCustomJoinError>;
251/// #
252/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
253/// # unreachable!()
254/// # }
255/// # }
256/// #
257/// # struct MyCustomRuntime;
258/// #
259/// # impl Runtime for MyCustomRuntime {
260/// # type JoinError = MyCustomJoinError;
261/// # type JoinHandle = MyCustomJoinHandle;
262/// #
263/// # fn spawn<F>(fut: F) -> Self::JoinHandle
264/// # where
265/// # F: Future<Output = ()> + Send + 'static
266/// # {
267/// # unreachable!()
268/// # }
269/// # }
270/// #
271/// # impl ContextExt for MyCustomRuntime {
272/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
273/// # where
274/// # F: Future<Output = R> + Send + 'static
275/// # {
276/// # unreachable!()
277/// # }
278/// # fn get_task_locals() -> Option<TaskLocals> {
279/// # unreachable!()
280/// # }
281/// # }
282/// #
283/// # use std::time::Duration;
284/// # async fn custom_sleep(_duration: Duration) { }
285/// #
286/// # use pyo3::prelude::*;
287/// #
288/// fn main() {
289/// Python::with_gil(|py| {
290/// pyo3_asyncio_0_21::generic::run::<MyCustomRuntime, _, _>(py, async move {
291/// custom_sleep(Duration::from_secs(1)).await;
292/// Ok(())
293/// })
294/// .map_err(|e| {
295/// e.print_and_set_sys_last_vars(py);
296/// })
297/// .unwrap();
298/// })
299/// }
300/// ```
301pub fn run<R, F, T>(py: Python, fut: F) -> PyResult<T>
302where
303 R: Runtime + ContextExt,
304 F: Future<Output = PyResult<T>> + Send + 'static,
305 T: Send + Sync + 'static,
306{
307 let event_loop = asyncio(py)?.call_method0("new_event_loop")?;
308
309 let result = run_until_complete::<R, F, T>(&event_loop, fut);
310
311 close(event_loop)?;
312
313 result
314}
315
316fn cancelled(future: &Bound<PyAny>) -> PyResult<bool> {
317 future.getattr("cancelled")?.call0()?.is_truthy()
318}
319
320#[pyclass]
321struct CheckedCompletor;
322
323#[pymethods]
324impl CheckedCompletor {
325 fn __call__(
326 &self,
327 future: &Bound<PyAny>,
328 complete: &Bound<PyAny>,
329 value: &Bound<PyAny>,
330 ) -> PyResult<()> {
331 if cancelled(future)? {
332 return Ok(());
333 }
334
335 complete.call1((value,))?;
336
337 Ok(())
338 }
339}
340
341fn set_result(
342 event_loop: &Bound<PyAny>,
343 future: &Bound<PyAny>,
344 result: PyResult<PyObject>,
345) -> PyResult<()> {
346 let py = event_loop.py();
347 let none = py.None().into_bound(py);
348
349 let (complete, val) = match result {
350 Ok(val) => (future.getattr("set_result")?, val.into_py(py)),
351 Err(err) => (future.getattr("set_exception")?, err.into_py(py)),
352 };
353 call_soon_threadsafe(event_loop, &none, (CheckedCompletor, future, complete, val))?;
354
355 Ok(())
356}
357
358/// Convert a Python `awaitable` into a Rust Future
359///
360/// This function simply forwards the future and the task locals returned by [`get_current_locals`]
361/// to [`into_future_with_locals`](`crate::into_future_with_locals`). See
362/// [`into_future_with_locals`](`crate::into_future_with_locals`) for more details.
363///
364/// # Arguments
365/// * `awaitable` - The Python `awaitable` to be converted
366///
367/// # Examples
368///
369/// ```no_run
370/// # use std::{any::Any, pin::Pin, future::Future, task::{Context, Poll}, time::Duration};
371/// #
372/// # use pyo3::prelude::*;
373/// #
374/// # use pyo3_asyncio_0_21::{
375/// # TaskLocals,
376/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
377/// # };
378/// #
379/// # struct MyCustomJoinError;
380/// #
381/// # impl JoinError for MyCustomJoinError {
382/// # fn is_panic(&self) -> bool {
383/// # unreachable!()
384/// # }
385/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
386/// # unreachable!()
387/// # }
388/// # }
389/// #
390/// # struct MyCustomJoinHandle;
391/// #
392/// # impl Future for MyCustomJoinHandle {
393/// # type Output = Result<(), MyCustomJoinError>;
394/// #
395/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
396/// # unreachable!()
397/// # }
398/// # }
399/// #
400/// # struct MyCustomRuntime;
401/// #
402/// # impl MyCustomRuntime {
403/// # async fn sleep(_: Duration) {
404/// # unreachable!()
405/// # }
406/// # }
407/// #
408/// # impl Runtime for MyCustomRuntime {
409/// # type JoinError = MyCustomJoinError;
410/// # type JoinHandle = MyCustomJoinHandle;
411/// #
412/// # fn spawn<F>(fut: F) -> Self::JoinHandle
413/// # where
414/// # F: Future<Output = ()> + Send + 'static
415/// # {
416/// # unreachable!()
417/// # }
418/// # }
419/// #
420/// # impl ContextExt for MyCustomRuntime {
421/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
422/// # where
423/// # F: Future<Output = R> + Send + 'static
424/// # {
425/// # unreachable!()
426/// # }
427/// # fn get_task_locals() -> Option<TaskLocals> {
428/// # unreachable!()
429/// # }
430/// # }
431/// #
432/// const PYTHON_CODE: &'static str = r#"
433/// import asyncio
434///
435/// async def py_sleep(duration):
436/// await asyncio.sleep(duration)
437/// "#;
438///
439/// async fn py_sleep(seconds: f32) -> PyResult<()> {
440/// let test_mod = Python::with_gil(|py| -> PyResult<PyObject> {
441/// Ok(
442/// PyModule::from_code_bound(
443/// py,
444/// PYTHON_CODE,
445/// "test_into_future/test_mod.py",
446/// "test_mod"
447/// )?
448/// .into()
449/// )
450/// })?;
451///
452/// Python::with_gil(|py| {
453/// pyo3_asyncio_0_21::generic::into_future::<MyCustomRuntime>(
454/// test_mod
455/// .call_method1(py, "py_sleep", (seconds.into_py(py),))?
456/// .into_bound(py),
457/// )
458/// })?
459/// .await?;
460/// Ok(())
461/// }
462/// ```
463pub fn into_future<R>(
464 awaitable: Bound<PyAny>,
465) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
466where
467 R: Runtime + ContextExt,
468{
469 into_future_with_locals(&get_current_locals::<R>(awaitable.py())?, awaitable)
470}
471
472/// Convert a Rust Future into a Python awaitable with a generic runtime
473///
474/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
475/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
476///
477/// Python `contextvars` are preserved when calling async Python functions within the Rust future
478/// via [`into_future`] (new behaviour in `v0.15`).
479///
480/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
481/// unfortunately fail to resolve them when called within the Rust future. This is because the
482/// function is being called from a Rust thread, not inside an actual Python coroutine context.
483/// >
484/// > As a workaround, you can get the `contextvars` from the current task locals using
485/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
486/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
487/// synchronous function, and restore the previous context when it returns or raises an exception.
488///
489/// # Arguments
490/// * `py` - PyO3 GIL guard
491/// * `locals` - The task-local data for Python
492/// * `fut` - The Rust future to be converted
493///
494/// # Examples
495///
496/// ```no_run
497/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
498/// #
499/// # use pyo3_asyncio_0_21::{
500/// # TaskLocals,
501/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
502/// # };
503/// #
504/// # struct MyCustomJoinError;
505/// #
506/// # impl JoinError for MyCustomJoinError {
507/// # fn is_panic(&self) -> bool {
508/// # unreachable!()
509/// # }
510/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
511/// # unreachable!()
512/// # }
513/// # }
514/// #
515/// # struct MyCustomJoinHandle;
516/// #
517/// # impl Future for MyCustomJoinHandle {
518/// # type Output = Result<(), MyCustomJoinError>;
519/// #
520/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
521/// # unreachable!()
522/// # }
523/// # }
524/// #
525/// # struct MyCustomRuntime;
526/// #
527/// # impl MyCustomRuntime {
528/// # async fn sleep(_: Duration) {
529/// # unreachable!()
530/// # }
531/// # }
532/// #
533/// # impl Runtime for MyCustomRuntime {
534/// # type JoinError = MyCustomJoinError;
535/// # type JoinHandle = MyCustomJoinHandle;
536/// #
537/// # fn spawn<F>(fut: F) -> Self::JoinHandle
538/// # where
539/// # F: Future<Output = ()> + Send + 'static
540/// # {
541/// # unreachable!()
542/// # }
543/// # }
544/// #
545/// # impl ContextExt for MyCustomRuntime {
546/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
547/// # where
548/// # F: Future<Output = R> + Send + 'static
549/// # {
550/// # unreachable!()
551/// # }
552/// # fn get_task_locals() -> Option<TaskLocals> {
553/// # unreachable!()
554/// # }
555/// # }
556/// #
557/// use std::time::Duration;
558///
559/// use pyo3::prelude::*;
560///
561/// /// Awaitable sleep function
562/// #[pyfunction]
563/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
564/// let secs = secs.extract()?;
565/// pyo3_asyncio_0_21::generic::future_into_py_with_locals::<MyCustomRuntime, _, _>(
566/// py,
567/// pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
568/// async move {
569/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
570/// Ok(())
571/// }
572/// )
573/// }
574/// ```
575#[allow(unused_must_use)]
576pub fn future_into_py_with_locals<R, F, T>(
577 py: Python,
578 locals: TaskLocals,
579 fut: F,
580) -> PyResult<Bound<PyAny>>
581where
582 R: Runtime + ContextExt,
583 F: Future<Output = PyResult<T>> + Send + 'static,
584 T: IntoPy<PyObject>,
585{
586 let (cancel_tx, cancel_rx) = oneshot::channel();
587
588 let py_fut = create_future(locals.event_loop.clone().into_bound(py))?;
589 py_fut.call_method1(
590 "add_done_callback",
591 (PyDoneCallback {
592 cancel_tx: Some(cancel_tx),
593 },),
594 )?;
595
596 let future_tx1 = PyObject::from(py_fut.clone());
597 let future_tx2 = future_tx1.clone();
598
599 R::spawn(async move {
600 let locals2 = locals.clone();
601
602 if let Err(e) = R::spawn(async move {
603 let result = R::scope(
604 locals2.clone(),
605 Cancellable::new_with_cancel_rx(fut, cancel_rx),
606 )
607 .await;
608
609 Python::with_gil(move |py| {
610 if cancelled(future_tx1.bind(py))
611 .map_err(dump_err(py))
612 .unwrap_or(false)
613 {
614 return;
615 }
616
617 let _ = set_result(
618 &locals2.event_loop(py),
619 future_tx1.bind(py),
620 result.map(|val| val.into_py(py)),
621 )
622 .map_err(dump_err(py));
623 });
624 })
625 .await
626 {
627 if e.is_panic() {
628 Python::with_gil(move |py| {
629 if cancelled(future_tx2.bind(py))
630 .map_err(dump_err(py))
631 .unwrap_or(false)
632 {
633 return;
634 }
635
636 let panic_message = format!(
637 "rust future panicked: {}",
638 get_panic_message(&e.into_panic())
639 );
640 let _ = set_result(
641 locals.event_loop.bind(py),
642 future_tx2.bind(py),
643 Err(RustPanic::new_err(panic_message)),
644 )
645 .map_err(dump_err(py));
646 });
647 }
648 }
649 });
650
651 Ok(py_fut)
652}
653
654fn get_panic_message(any: &dyn std::any::Any) -> &str {
655 if let Some(str_slice) = any.downcast_ref::<&str>() {
656 str_slice
657 } else if let Some(string) = any.downcast_ref::<String>() {
658 string
659 } else {
660 "unknown error"
661 }
662}
663
664pin_project! {
665 /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
666 #[must_use = "futures do nothing unless you `.await` or poll them"]
667 #[derive(Debug)]
668 struct Cancellable<T> {
669 #[pin]
670 future: T,
671 #[pin]
672 cancel_rx: oneshot::Receiver<()>,
673
674 poll_cancel_rx: bool
675 }
676}
677
678impl<T> Cancellable<T> {
679 fn new_with_cancel_rx(future: T, cancel_rx: oneshot::Receiver<()>) -> Self {
680 Self {
681 future,
682 cancel_rx,
683
684 poll_cancel_rx: true,
685 }
686 }
687}
688
689impl<F, T> Future for Cancellable<F>
690where
691 F: Future<Output = PyResult<T>>,
692 T: IntoPy<PyObject>,
693{
694 type Output = F::Output;
695
696 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
697 let this = self.project();
698
699 // First, try polling the future
700 if let Poll::Ready(v) = this.future.poll(cx) {
701 return Poll::Ready(v);
702 }
703
704 // Now check for cancellation
705 if *this.poll_cancel_rx {
706 match this.cancel_rx.poll(cx) {
707 Poll::Ready(Ok(())) => {
708 *this.poll_cancel_rx = false;
709 // The python future has already been cancelled, so this return value will never
710 // be used.
711 Poll::Ready(Err(pyo3::exceptions::PyBaseException::new_err(
712 "unreachable",
713 )))
714 }
715 Poll::Ready(Err(_)) => {
716 *this.poll_cancel_rx = false;
717 Poll::Pending
718 }
719 Poll::Pending => Poll::Pending,
720 }
721 } else {
722 Poll::Pending
723 }
724 }
725}
726
727#[pyclass]
728struct PyDoneCallback {
729 cancel_tx: Option<oneshot::Sender<()>>,
730}
731
732#[pymethods]
733impl PyDoneCallback {
734 pub fn __call__(&mut self, fut: &Bound<PyAny>) -> PyResult<()> {
735 let py = fut.py();
736
737 if cancelled(fut).map_err(dump_err(py)).unwrap_or(false) {
738 let _ = self.cancel_tx.take().unwrap().send(());
739 }
740
741 Ok(())
742 }
743}
744
745/// Convert a Rust Future into a Python awaitable with a generic runtime
746///
747/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
748/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
749///
750/// Python `contextvars` are preserved when calling async Python functions within the Rust future
751/// via [`into_future`] (new behaviour in `v0.15`).
752///
753/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
754/// unfortunately fail to resolve them when called within the Rust future. This is because the
755/// function is being called from a Rust thread, not inside an actual Python coroutine context.
756/// >
757/// > As a workaround, you can get the `contextvars` from the current task locals using
758/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
759/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
760/// synchronous function, and restore the previous context when it returns or raises an exception.
761///
762/// # Arguments
763/// * `py` - The current PyO3 GIL guard
764/// * `fut` - The Rust future to be converted
765///
766/// # Examples
767///
768/// ```no_run
769/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
770/// #
771/// # use pyo3_asyncio_0_21::{
772/// # TaskLocals,
773/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
774/// # };
775/// #
776/// # struct MyCustomJoinError;
777/// #
778/// # impl JoinError for MyCustomJoinError {
779/// # fn is_panic(&self) -> bool {
780/// # unreachable!()
781/// # }
782/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
783/// # unreachable!()
784/// # }
785/// # }
786/// #
787/// # struct MyCustomJoinHandle;
788/// #
789/// # impl Future for MyCustomJoinHandle {
790/// # type Output = Result<(), MyCustomJoinError>;
791/// #
792/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
793/// # unreachable!()
794/// # }
795/// # }
796/// #
797/// # struct MyCustomRuntime;
798/// #
799/// # impl MyCustomRuntime {
800/// # async fn sleep(_: Duration) {
801/// # unreachable!()
802/// # }
803/// # }
804/// #
805/// # impl Runtime for MyCustomRuntime {
806/// # type JoinError = MyCustomJoinError;
807/// # type JoinHandle = MyCustomJoinHandle;
808/// #
809/// # fn spawn<F>(fut: F) -> Self::JoinHandle
810/// # where
811/// # F: Future<Output = ()> + Send + 'static
812/// # {
813/// # unreachable!()
814/// # }
815/// # }
816/// #
817/// # impl ContextExt for MyCustomRuntime {
818/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
819/// # where
820/// # F: Future<Output = R> + Send + 'static
821/// # {
822/// # unreachable!()
823/// # }
824/// # fn get_task_locals() -> Option<TaskLocals> {
825/// # unreachable!()
826/// # }
827/// # }
828/// #
829/// use std::time::Duration;
830///
831/// use pyo3::prelude::*;
832///
833/// /// Awaitable sleep function
834/// #[pyfunction]
835/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
836/// let secs = secs.extract()?;
837/// pyo3_asyncio_0_21::generic::future_into_py::<MyCustomRuntime, _, _>(py, async move {
838/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
839/// Ok(())
840/// })
841/// }
842/// ```
843pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
844where
845 R: Runtime + ContextExt,
846 F: Future<Output = PyResult<T>> + Send + 'static,
847 T: IntoPy<PyObject>,
848{
849 future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
850}
851
852/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime and manual
853/// specification of task locals.
854///
855/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
856/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
857///
858/// Python `contextvars` are preserved when calling async Python functions within the Rust future
859/// via [`into_future`] (new behaviour in `v0.15`).
860///
861/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
862/// unfortunately fail to resolve them when called within the Rust future. This is because the
863/// function is being called from a Rust thread, not inside an actual Python coroutine context.
864/// >
865/// > As a workaround, you can get the `contextvars` from the current task locals using
866/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
867/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
868/// synchronous function, and restore the previous context when it returns or raises an exception.
869///
870/// # Arguments
871/// * `py` - PyO3 GIL guard
872/// * `locals` - The task locals for the future
873/// * `fut` - The Rust future to be converted
874///
875/// # Examples
876///
877/// ```no_run
878/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
879/// #
880/// # use pyo3_asyncio_0_21::{
881/// # TaskLocals,
882/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
883/// # };
884/// #
885/// # struct MyCustomJoinError;
886/// #
887/// # impl JoinError for MyCustomJoinError {
888/// # fn is_panic(&self) -> bool {
889/// # unreachable!()
890/// # }
891/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
892/// # unreachable!()
893/// # }
894/// # }
895/// #
896/// # struct MyCustomJoinHandle;
897/// #
898/// # impl Future for MyCustomJoinHandle {
899/// # type Output = Result<(), MyCustomJoinError>;
900/// #
901/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
902/// # unreachable!()
903/// # }
904/// # }
905/// #
906/// # struct MyCustomRuntime;
907/// #
908/// # impl MyCustomRuntime {
909/// # async fn sleep(_: Duration) {
910/// # unreachable!()
911/// # }
912/// # }
913/// #
914/// # impl Runtime for MyCustomRuntime {
915/// # type JoinError = MyCustomJoinError;
916/// # type JoinHandle = MyCustomJoinHandle;
917/// #
918/// # fn spawn<F>(fut: F) -> Self::JoinHandle
919/// # where
920/// # F: Future<Output = ()> + Send + 'static
921/// # {
922/// # unreachable!()
923/// # }
924/// # }
925/// #
926/// # impl ContextExt for MyCustomRuntime {
927/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
928/// # where
929/// # F: Future<Output = R> + Send + 'static
930/// # {
931/// # unreachable!()
932/// # }
933/// # fn get_task_locals() -> Option<TaskLocals> {
934/// # unreachable!()
935/// # }
936/// # }
937/// #
938/// # impl SpawnLocalExt for MyCustomRuntime {
939/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
940/// # where
941/// # F: Future<Output = ()> + 'static
942/// # {
943/// # unreachable!()
944/// # }
945/// # }
946/// #
947/// # impl LocalContextExt for MyCustomRuntime {
948/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
949/// # where
950/// # F: Future<Output = R> + 'static
951/// # {
952/// # unreachable!()
953/// # }
954/// # }
955/// #
956/// use std::{rc::Rc, time::Duration};
957///
958/// use pyo3::prelude::*;
959///
960/// /// Awaitable sleep function
961/// #[pyfunction]
962/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
963/// // Rc is !Send so it cannot be passed into pyo3_asyncio_0_21::generic::future_into_py
964/// let secs = Rc::new(secs);
965///
966/// pyo3_asyncio_0_21::generic::local_future_into_py_with_locals::<MyCustomRuntime, _, _>(
967/// py,
968/// pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
969/// async move {
970/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
971/// Ok(())
972/// }
973/// )
974/// }
975/// ```
976#[deprecated(
977 since = "0.18.0",
978 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
979)]
980#[allow(unused_must_use)]
981pub fn local_future_into_py_with_locals<R, F, T>(
982 py: Python,
983 locals: TaskLocals,
984 fut: F,
985) -> PyResult<Bound<PyAny>>
986where
987 R: Runtime + SpawnLocalExt + LocalContextExt,
988 F: Future<Output = PyResult<T>> + 'static,
989 T: IntoPy<PyObject>,
990{
991 let (cancel_tx, cancel_rx) = oneshot::channel();
992
993 let py_fut = create_future(locals.event_loop.clone().into_bound(py))?;
994 py_fut.call_method1(
995 "add_done_callback",
996 (PyDoneCallback {
997 cancel_tx: Some(cancel_tx),
998 },),
999 )?;
1000
1001 let future_tx1 = PyObject::from(py_fut.clone());
1002 let future_tx2 = future_tx1.clone();
1003
1004 R::spawn_local(async move {
1005 let locals2 = locals.clone();
1006
1007 if let Err(e) = R::spawn_local(async move {
1008 let result = R::scope_local(
1009 locals2.clone(),
1010 Cancellable::new_with_cancel_rx(fut, cancel_rx),
1011 )
1012 .await;
1013
1014 Python::with_gil(move |py| {
1015 if cancelled(future_tx1.bind(py))
1016 .map_err(dump_err(py))
1017 .unwrap_or(false)
1018 {
1019 return;
1020 }
1021
1022 let _ = set_result(
1023 locals2.event_loop.bind(py),
1024 future_tx1.bind(py),
1025 result.map(|val| val.into_py(py)),
1026 )
1027 .map_err(dump_err(py));
1028 });
1029 })
1030 .await
1031 {
1032 if e.is_panic() {
1033 Python::with_gil(move |py| {
1034 if cancelled(future_tx2.bind(py))
1035 .map_err(dump_err(py))
1036 .unwrap_or(false)
1037 {
1038 return;
1039 }
1040
1041 let panic_message = format!(
1042 "rust future panicked: {}",
1043 get_panic_message(&e.into_panic())
1044 );
1045 let _ = set_result(
1046 locals.event_loop.bind(py),
1047 future_tx2.bind(py),
1048 Err(RustPanic::new_err(panic_message)),
1049 )
1050 .map_err(dump_err(py));
1051 });
1052 }
1053 }
1054 });
1055
1056 Ok(py_fut)
1057}
1058
1059/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime
1060///
1061/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`,
1062/// the Rust future will be cancelled as well (new behaviour in `v0.15`).
1063///
1064/// Python `contextvars` are preserved when calling async Python functions within the Rust future
1065/// via [`into_future`] (new behaviour in `v0.15`).
1066///
1067/// > Although `contextvars` are preserved for async Python functions, synchronous functions will
1068/// unfortunately fail to resolve them when called within the Rust future. This is because the
1069/// function is being called from a Rust thread, not inside an actual Python coroutine context.
1070/// >
1071/// > As a workaround, you can get the `contextvars` from the current task locals using
1072/// [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your
1073/// synchronous function in a call to `contextvars.Context.run`. This will set the context, call the
1074/// synchronous function, and restore the previous context when it returns or raises an exception.
1075///
1076/// # Arguments
1077/// * `py` - The current PyO3 GIL guard
1078/// * `fut` - The Rust future to be converted
1079///
1080/// # Examples
1081///
1082/// ```no_run
1083/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1084/// #
1085/// # use pyo3_asyncio_0_21::{
1086/// # TaskLocals,
1087/// # generic::{JoinError, SpawnLocalExt, ContextExt, LocalContextExt, Runtime}
1088/// # };
1089/// #
1090/// # struct MyCustomJoinError;
1091/// #
1092/// # impl JoinError for MyCustomJoinError {
1093/// # fn is_panic(&self) -> bool {
1094/// # unreachable!()
1095/// # }
1096/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1097/// # unreachable!()
1098/// # }
1099/// # }
1100/// #
1101/// # struct MyCustomJoinHandle;
1102/// #
1103/// # impl Future for MyCustomJoinHandle {
1104/// # type Output = Result<(), MyCustomJoinError>;
1105/// #
1106/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1107/// # unreachable!()
1108/// # }
1109/// # }
1110/// #
1111/// # struct MyCustomRuntime;
1112/// #
1113/// # impl MyCustomRuntime {
1114/// # async fn sleep(_: Duration) {
1115/// # unreachable!()
1116/// # }
1117/// # }
1118/// #
1119/// # impl Runtime for MyCustomRuntime {
1120/// # type JoinError = MyCustomJoinError;
1121/// # type JoinHandle = MyCustomJoinHandle;
1122/// #
1123/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1124/// # where
1125/// # F: Future<Output = ()> + Send + 'static
1126/// # {
1127/// # unreachable!()
1128/// # }
1129/// # }
1130/// #
1131/// # impl ContextExt for MyCustomRuntime {
1132/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1133/// # where
1134/// # F: Future<Output = R> + Send + 'static
1135/// # {
1136/// # unreachable!()
1137/// # }
1138/// # fn get_task_locals() -> Option<TaskLocals> {
1139/// # unreachable!()
1140/// # }
1141/// # }
1142/// #
1143/// # impl SpawnLocalExt for MyCustomRuntime {
1144/// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
1145/// # where
1146/// # F: Future<Output = ()> + 'static
1147/// # {
1148/// # unreachable!()
1149/// # }
1150/// # }
1151/// #
1152/// # impl LocalContextExt for MyCustomRuntime {
1153/// # fn scope_local<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R>>>
1154/// # where
1155/// # F: Future<Output = R> + 'static
1156/// # {
1157/// # unreachable!()
1158/// # }
1159/// # }
1160/// #
1161/// use std::{rc::Rc, time::Duration};
1162///
1163/// use pyo3::prelude::*;
1164///
1165/// /// Awaitable sleep function
1166/// #[pyfunction]
1167/// fn sleep_for(py: Python, secs: u64) -> PyResult<Bound<PyAny>> {
1168/// // Rc is !Send so it cannot be passed into pyo3_asyncio_0_21::generic::future_into_py
1169/// let secs = Rc::new(secs);
1170///
1171/// pyo3_asyncio_0_21::generic::local_future_into_py::<MyCustomRuntime, _, _>(py, async move {
1172/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await;
1173/// Ok(())
1174/// })
1175/// }
1176/// ```
1177#[deprecated(
1178 since = "0.18.0",
1179 note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)"
1180)]
1181#[allow(deprecated)]
1182pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
1183where
1184 R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
1185 F: Future<Output = PyResult<T>> + 'static,
1186 T: IntoPy<PyObject>,
1187{
1188 local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
1189}
1190
1191/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1192///
1193/// **This API is marked as unstable** and is only available when the
1194/// `unstable-streams` crate feature is enabled. This comes with no
1195/// stability guarantees, and could be changed or removed at any time.
1196///
1197/// # Arguments
1198/// * `locals` - The current task locals
1199/// * `gen` - The Python async generator to be converted
1200///
1201/// # Examples
1202/// ```no_run
1203/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1204/// #
1205/// # use pyo3_asyncio_0_21::{
1206/// # TaskLocals,
1207/// # generic::{JoinError, ContextExt, Runtime}
1208/// # };
1209/// #
1210/// # struct MyCustomJoinError;
1211/// #
1212/// # impl JoinError for MyCustomJoinError {
1213/// # fn is_panic(&self) -> bool {
1214/// # unreachable!()
1215/// # }
1216/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1217/// # unreachable!()
1218/// # }
1219/// # }
1220/// #
1221/// # struct MyCustomJoinHandle;
1222/// #
1223/// # impl Future for MyCustomJoinHandle {
1224/// # type Output = Result<(), MyCustomJoinError>;
1225/// #
1226/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1227/// # unreachable!()
1228/// # }
1229/// # }
1230/// #
1231/// # struct MyCustomRuntime;
1232/// #
1233/// # impl Runtime for MyCustomRuntime {
1234/// # type JoinError = MyCustomJoinError;
1235/// # type JoinHandle = MyCustomJoinHandle;
1236/// #
1237/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1238/// # where
1239/// # F: Future<Output = ()> + Send + 'static
1240/// # {
1241/// # unreachable!()
1242/// # }
1243/// # }
1244/// #
1245/// # impl ContextExt for MyCustomRuntime {
1246/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1247/// # where
1248/// # F: Future<Output = R> + Send + 'static
1249/// # {
1250/// # unreachable!()
1251/// # }
1252/// # fn get_task_locals() -> Option<TaskLocals> {
1253/// # unreachable!()
1254/// # }
1255/// # }
1256///
1257/// use pyo3::prelude::*;
1258/// use futures::{StreamExt, TryStreamExt};
1259///
1260/// const TEST_MOD: &str = r#"
1261/// import asyncio
1262///
1263/// async def gen():
1264/// for i in range(10):
1265/// await asyncio.sleep(0.1)
1266/// yield i
1267/// "#;
1268///
1269/// # async fn test_async_gen() -> PyResult<()> {
1270/// let stream = Python::with_gil(|py| {
1271/// let test_mod = PyModule::from_code_bound(
1272/// py,
1273/// TEST_MOD,
1274/// "test_rust_coroutine/test_mod.py",
1275/// "test_mod",
1276/// )?;
1277///
1278/// pyo3_asyncio_0_21::generic::into_stream_with_locals_v1::<MyCustomRuntime>(
1279/// pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
1280/// test_mod.call_method0("gen")?
1281/// )
1282/// })?;
1283///
1284/// let vals = stream
1285/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1286/// .try_collect::<Vec<i32>>()
1287/// .await?;
1288///
1289/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1290///
1291/// Ok(())
1292/// # }
1293/// ```
1294#[cfg(feature = "unstable-streams")]
1295#[allow(unused_must_use)] // False positive unused lint on `R::spawn`
1296pub fn into_stream_with_locals_v1<'p, R>(
1297 locals: TaskLocals,
1298 gen: Bound<'p, PyAny>,
1299) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1300where
1301 R: Runtime,
1302{
1303 let (tx, rx) = async_channel::bounded(1);
1304 let anext = PyObject::from(gen.getattr("__anext__")?);
1305
1306 R::spawn(async move {
1307 loop {
1308 let fut = Python::with_gil(|py| -> PyResult<_> {
1309 into_future_with_locals(&locals, anext.bind(py).call0()?)
1310 });
1311 let item = match fut {
1312 Ok(fut) => match fut.await {
1313 Ok(item) => Ok(item),
1314 Err(e) => {
1315 let stop_iter = Python::with_gil(|py| {
1316 e.is_instance_of::<pyo3::exceptions::PyStopAsyncIteration>(py)
1317 });
1318
1319 if stop_iter {
1320 // end the iteration
1321 break;
1322 } else {
1323 Err(e)
1324 }
1325 }
1326 },
1327 Err(e) => Err(e),
1328 };
1329
1330 if tx.send(item).await.is_err() {
1331 // receiving side was dropped
1332 break;
1333 }
1334 }
1335 });
1336
1337 Ok(rx)
1338}
1339
1340/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1341///
1342/// **This API is marked as unstable** and is only available when the
1343/// `unstable-streams` crate feature is enabled. This comes with no
1344/// stability guarantees, and could be changed or removed at any time.
1345///
1346/// # Arguments
1347/// * `gen` - The Python async generator to be converted
1348///
1349/// # Examples
1350/// ```no_run
1351/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1352/// #
1353/// # use pyo3_asyncio_0_21::{
1354/// # TaskLocals,
1355/// # generic::{JoinError, ContextExt, Runtime}
1356/// # };
1357/// #
1358/// # struct MyCustomJoinError;
1359/// #
1360/// # impl JoinError for MyCustomJoinError {
1361/// # fn is_panic(&self) -> bool {
1362/// # unreachable!()
1363/// # }
1364/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1365/// # unreachable!()
1366/// # }
1367/// # }
1368/// #
1369/// # struct MyCustomJoinHandle;
1370/// #
1371/// # impl Future for MyCustomJoinHandle {
1372/// # type Output = Result<(), MyCustomJoinError>;
1373/// #
1374/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1375/// # unreachable!()
1376/// # }
1377/// # }
1378/// #
1379/// # struct MyCustomRuntime;
1380/// #
1381/// # impl Runtime for MyCustomRuntime {
1382/// # type JoinError = MyCustomJoinError;
1383/// # type JoinHandle = MyCustomJoinHandle;
1384/// #
1385/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1386/// # where
1387/// # F: Future<Output = ()> + Send + 'static
1388/// # {
1389/// # unreachable!()
1390/// # }
1391/// # }
1392/// #
1393/// # impl ContextExt for MyCustomRuntime {
1394/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1395/// # where
1396/// # F: Future<Output = R> + Send + 'static
1397/// # {
1398/// # unreachable!()
1399/// # }
1400/// # fn get_task_locals() -> Option<TaskLocals> {
1401/// # unreachable!()
1402/// # }
1403/// # }
1404///
1405/// use pyo3::prelude::*;
1406/// use futures::{StreamExt, TryStreamExt};
1407///
1408/// const TEST_MOD: &str = r#"
1409/// import asyncio
1410///
1411/// async def gen():
1412/// for i in range(10):
1413/// await asyncio.sleep(0.1)
1414/// yield i
1415/// "#;
1416///
1417/// # async fn test_async_gen() -> PyResult<()> {
1418/// let stream = Python::with_gil(|py| {
1419/// let test_mod = PyModule::from_code_bound(
1420/// py,
1421/// TEST_MOD,
1422/// "test_rust_coroutine/test_mod.py",
1423/// "test_mod",
1424/// )?;
1425///
1426/// pyo3_asyncio_0_21::generic::into_stream_v1::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1427/// })?;
1428///
1429/// let vals = stream
1430/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.bind(py).extract()?) }))
1431/// .try_collect::<Vec<i32>>()
1432/// .await?;
1433///
1434/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1435///
1436/// Ok(())
1437/// # }
1438/// ```
1439#[cfg(feature = "unstable-streams")]
1440pub fn into_stream_v1<'p, R>(
1441 gen: Bound<'p, PyAny>,
1442) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static>
1443where
1444 R: Runtime + ContextExt,
1445{
1446 into_stream_with_locals_v1::<R>(get_current_locals::<R>(gen.py())?, gen)
1447}
1448
1449fn py_true() -> PyObject {
1450 static TRUE: OnceCell<PyObject> = OnceCell::new();
1451 TRUE.get_or_init(|| Python::with_gil(|py| true.into_py(py)))
1452 .clone()
1453}
1454fn py_false() -> PyObject {
1455 static FALSE: OnceCell<PyObject> = OnceCell::new();
1456 FALSE
1457 .get_or_init(|| Python::with_gil(|py| false.into_py(py)))
1458 .clone()
1459}
1460
1461trait Sender: Send + 'static {
1462 fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject>;
1463 fn close(&mut self) -> PyResult<()>;
1464}
1465
1466struct GenericSender<R>
1467where
1468 R: Runtime,
1469{
1470 runtime: PhantomData<R>,
1471 tx: mpsc::Sender<PyObject>,
1472}
1473
1474impl<R> Sender for GenericSender<R>
1475where
1476 R: Runtime + ContextExt,
1477{
1478 fn send(&mut self, locals: TaskLocals, item: PyObject) -> PyResult<PyObject> {
1479 match self.tx.try_send(item.clone()) {
1480 Ok(_) => Ok(py_true()),
1481 Err(e) => {
1482 if e.is_full() {
1483 let mut tx = self.tx.clone();
1484 Python::with_gil(move |py| {
1485 Ok(
1486 future_into_py_with_locals::<R, _, PyObject>(py, locals, async move {
1487 if tx.flush().await.is_err() {
1488 // receiving side disconnected
1489 return Ok(py_false());
1490 }
1491 if tx.send(item).await.is_err() {
1492 // receiving side disconnected
1493 return Ok(py_false());
1494 }
1495 Ok(py_true())
1496 })?
1497 .into(),
1498 )
1499 })
1500 } else {
1501 Ok(py_false())
1502 }
1503 }
1504 }
1505 }
1506 fn close(&mut self) -> PyResult<()> {
1507 self.tx.close_channel();
1508 Ok(())
1509 }
1510}
1511
1512#[pyclass]
1513struct SenderGlue {
1514 locals: TaskLocals,
1515 tx: Box<dyn Sender>,
1516}
1517#[pymethods]
1518impl SenderGlue {
1519 pub fn send(&mut self, item: PyObject) -> PyResult<PyObject> {
1520 self.tx.send(self.locals.clone(), item)
1521 }
1522 pub fn close(&mut self) -> PyResult<()> {
1523 self.tx.close()
1524 }
1525}
1526
1527#[cfg(feature = "unstable-streams")]
1528const STREAM_GLUE: &str = r#"
1529import asyncio
1530
1531async def forward(gen, sender):
1532 async for item in gen:
1533 should_continue = sender.send(item)
1534
1535 if asyncio.iscoroutine(should_continue):
1536 should_continue = await should_continue
1537
1538 if should_continue:
1539 continue
1540 else:
1541 break
1542
1543 sender.close()
1544"#;
1545
1546/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1547///
1548/// **This API is marked as unstable** and is only available when the
1549/// `unstable-streams` crate feature is enabled. This comes with no
1550/// stability guarantees, and could be changed or removed at any time.
1551///
1552/// # Arguments
1553/// * `locals` - The current task locals
1554/// * `gen` - The Python async generator to be converted
1555///
1556/// # Examples
1557/// ```no_run
1558/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1559/// #
1560/// # use pyo3_asyncio_0_21::{
1561/// # TaskLocals,
1562/// # generic::{JoinError, ContextExt, Runtime}
1563/// # };
1564/// #
1565/// # struct MyCustomJoinError;
1566/// #
1567/// # impl JoinError for MyCustomJoinError {
1568/// # fn is_panic(&self) -> bool {
1569/// # unreachable!()
1570/// # }
1571/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1572/// # unreachable!()
1573/// # }
1574/// # }
1575/// #
1576/// # struct MyCustomJoinHandle;
1577/// #
1578/// # impl Future for MyCustomJoinHandle {
1579/// # type Output = Result<(), MyCustomJoinError>;
1580/// #
1581/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1582/// # unreachable!()
1583/// # }
1584/// # }
1585/// #
1586/// # struct MyCustomRuntime;
1587/// #
1588/// # impl Runtime for MyCustomRuntime {
1589/// # type JoinError = MyCustomJoinError;
1590/// # type JoinHandle = MyCustomJoinHandle;
1591/// #
1592/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1593/// # where
1594/// # F: Future<Output = ()> + Send + 'static
1595/// # {
1596/// # unreachable!()
1597/// # }
1598/// # }
1599/// #
1600/// # impl ContextExt for MyCustomRuntime {
1601/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1602/// # where
1603/// # F: Future<Output = R> + Send + 'static
1604/// # {
1605/// # unreachable!()
1606/// # }
1607/// # fn get_task_locals() -> Option<TaskLocals> {
1608/// # unreachable!()
1609/// # }
1610/// # }
1611///
1612/// use pyo3::prelude::*;
1613/// use futures::{StreamExt, TryStreamExt};
1614///
1615/// const TEST_MOD: &str = r#"
1616/// import asyncio
1617///
1618/// async def gen():
1619/// for i in range(10):
1620/// await asyncio.sleep(0.1)
1621/// yield i
1622/// "#;
1623///
1624/// # async fn test_async_gen() -> PyResult<()> {
1625/// let stream = Python::with_gil(|py| {
1626/// let test_mod = PyModule::from_code_bound(
1627/// py,
1628/// TEST_MOD,
1629/// "test_rust_coroutine/test_mod.py",
1630/// "test_mod",
1631/// )?;
1632///
1633/// pyo3_asyncio_0_21::generic::into_stream_with_locals_v2::<MyCustomRuntime>(
1634/// pyo3_asyncio_0_21::generic::get_current_locals::<MyCustomRuntime>(py)?,
1635/// test_mod.call_method0("gen")?
1636/// )
1637/// })?;
1638///
1639/// let vals = stream
1640/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1641/// .try_collect::<Vec<i32>>()
1642/// .await?;
1643///
1644/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1645///
1646/// Ok(())
1647/// # }
1648/// ```
1649#[cfg(feature = "unstable-streams")]
1650pub fn into_stream_with_locals_v2<'p, R>(
1651 locals: TaskLocals,
1652 gen: Bound<'p, PyAny>,
1653) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1654where
1655 R: Runtime + ContextExt,
1656{
1657 static GLUE_MOD: OnceCell<PyObject> = OnceCell::new();
1658 let py = gen.py();
1659 let glue = GLUE_MOD
1660 .get_or_try_init(|| -> PyResult<PyObject> {
1661 Ok(PyModule::from_code_bound(
1662 py,
1663 STREAM_GLUE,
1664 "pyo3_asyncio/pyo3_asyncio_glue.py",
1665 "pyo3_asyncio_glue",
1666 )?
1667 .into())
1668 })?
1669 .bind(py);
1670
1671 let (tx, rx) = mpsc::channel(10);
1672
1673 locals.event_loop(py).call_method1(
1674 "call_soon_threadsafe",
1675 (
1676 locals.event_loop(py).getattr("create_task")?,
1677 glue.call_method1(
1678 "forward",
1679 (
1680 gen,
1681 SenderGlue {
1682 locals,
1683 tx: Box::new(GenericSender {
1684 runtime: PhantomData::<R>,
1685 tx,
1686 }),
1687 },
1688 ),
1689 )?,
1690 ),
1691 )?;
1692 Ok(rx)
1693}
1694
1695/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>unstable-streams</code></span> Convert an async generator into a stream
1696///
1697/// **This API is marked as unstable** and is only available when the
1698/// `unstable-streams` crate feature is enabled. This comes with no
1699/// stability guarantees, and could be changed or removed at any time.
1700///
1701/// # Arguments
1702/// * `gen` - The Python async generator to be converted
1703///
1704/// # Examples
1705/// ```no_run
1706/// # use std::{any::Any, task::{Context, Poll}, pin::Pin, future::Future};
1707/// #
1708/// # use pyo3_asyncio_0_21::{
1709/// # TaskLocals,
1710/// # generic::{JoinError, ContextExt, Runtime}
1711/// # };
1712/// #
1713/// # struct MyCustomJoinError;
1714/// #
1715/// # impl JoinError for MyCustomJoinError {
1716/// # fn is_panic(&self) -> bool {
1717/// # unreachable!()
1718/// # }
1719/// # fn into_panic(self) -> Box<(dyn Any + Send + 'static)> {
1720/// # unreachable!()
1721/// # }
1722/// # }
1723/// #
1724/// # struct MyCustomJoinHandle;
1725/// #
1726/// # impl Future for MyCustomJoinHandle {
1727/// # type Output = Result<(), MyCustomJoinError>;
1728/// #
1729/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
1730/// # unreachable!()
1731/// # }
1732/// # }
1733/// #
1734/// # struct MyCustomRuntime;
1735/// #
1736/// # impl Runtime for MyCustomRuntime {
1737/// # type JoinError = MyCustomJoinError;
1738/// # type JoinHandle = MyCustomJoinHandle;
1739/// #
1740/// # fn spawn<F>(fut: F) -> Self::JoinHandle
1741/// # where
1742/// # F: Future<Output = ()> + Send + 'static
1743/// # {
1744/// # unreachable!()
1745/// # }
1746/// # }
1747/// #
1748/// # impl ContextExt for MyCustomRuntime {
1749/// # fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
1750/// # where
1751/// # F: Future<Output = R> + Send + 'static
1752/// # {
1753/// # unreachable!()
1754/// # }
1755/// # fn get_task_locals() -> Option<TaskLocals> {
1756/// # unreachable!()
1757/// # }
1758/// # }
1759///
1760/// use pyo3::prelude::*;
1761/// use futures::{StreamExt, TryStreamExt};
1762///
1763/// const TEST_MOD: &str = r#"
1764/// import asyncio
1765///
1766/// async def gen():
1767/// for i in range(10):
1768/// await asyncio.sleep(0.1)
1769/// yield i
1770/// "#;
1771///
1772/// # async fn test_async_gen() -> PyResult<()> {
1773/// let stream = Python::with_gil(|py| {
1774/// let test_mod = PyModule::from_code_bound(
1775/// py,
1776/// TEST_MOD,
1777/// "test_rust_coroutine/test_mod.py",
1778/// "test_mod",
1779/// )?;
1780///
1781/// pyo3_asyncio_0_21::generic::into_stream_v2::<MyCustomRuntime>(test_mod.call_method0("gen")?)
1782/// })?;
1783///
1784/// let vals = stream
1785/// .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.bind(py).extract()?) }))
1786/// .try_collect::<Vec<i32>>()
1787/// .await?;
1788///
1789/// assert_eq!((0..10).collect::<Vec<i32>>(), vals);
1790///
1791/// Ok(())
1792/// # }
1793/// ```
1794#[cfg(feature = "unstable-streams")]
1795pub fn into_stream_v2<'p, R>(
1796 gen: Bound<'p, PyAny>,
1797) -> PyResult<impl futures::Stream<Item = PyObject> + 'static>
1798where
1799 R: Runtime + ContextExt,
1800{
1801 into_stream_with_locals_v2::<R>(get_current_locals::<R>(gen.py())?, gen)
1802}