pyo3_async_runtimes/
lib.rs

1#![warn(missing_docs)]
2#![allow(clippy::borrow_deref_ref)]
3
4//! Rust Bindings to the Python Asyncio Event Loop
5//!
6//! # Motivation
7//!
8//! This crate aims to provide a convenient interface to manage the interop between Python and
9//! Rust's async/await models. It supports conversions between Rust and Python futures and manages
10//! the event loops for both languages. Python's threading model and GIL can make this interop a bit
11//! trickier than one might expect, so there are a few caveats that users should be aware of.
12//!
13//! ## Why Two Event Loops
14//!
15//! Currently, we don't have a way to run Rust futures directly on Python's event loop. Likewise,
16//! Python's coroutines cannot be directly spawned on a Rust event loop. The two coroutine models
17//! require some additional assistance from their event loops, so in all likelihood they will need
18//! a new _unique_ event loop that addresses the needs of both languages if the coroutines are to
19//! be run on the same loop.
20//!
21//! It's not immediately clear that this would provide worthwhile performance wins either, so in the
22//! interest of getting something simple out there to facilitate these conversions, this crate
23//! handles the communication between _separate_ Python and Rust event loops.
24//!
25//! ## Python's Event Loop and the Main Thread
26//!
27//! Python is very picky about the threads used by the `asyncio` executor. In particular, it needs
28//! to have control over the main thread in order to handle signals like CTRL-C correctly. This
29//! means that Cargo's default test harness will no longer work since it doesn't provide a method of
30//! overriding the main function to add our event loop initialization and finalization.
31//!
32//! ## Event Loop References and ContextVars
33//!
34//! One problem that arises when interacting with Python's asyncio library is that the functions we
35//! use to get a reference to the Python event loop can only be called in certain contexts. Since
36//! PyO3 Asyncio needs to interact with Python's event loop during conversions, the context of these
37//! conversions can matter a lot.
38//!
39//! Likewise, Python's `contextvars` library can require some special treatment. Python functions
40//! and coroutines can rely on the context of outer coroutines to function correctly, so this
41//! library needs to be able to preserve `contextvars` during conversions.
42//!
43//! > The core conversions we've mentioned so far in the README should insulate you from these
44//! > concerns in most cases. For the edge cases where they don't, this section should provide you
45//! > with the information you need to solve these problems.
46//!
47//! ### The Main Dilemma
48//!
49//! Python programs can have many independent event loop instances throughout the lifetime of the
50//! application (`asyncio.run` for example creates its own event loop each time it's called for
51//! instance), and they can even run concurrent with other event loops. For this reason, the most
52//! correct method of obtaining a reference to the Python event loop is via
53//! `asyncio.get_running_loop`.
54//!
55//! `asyncio.get_running_loop` returns the event loop associated with the current OS thread. It can
56//! be used inside Python coroutines to spawn concurrent tasks, interact with timers, or in our case
57//! signal between Rust and Python. This is all well and good when we are operating on a Python
58//! thread, but since Rust threads are not associated with a Python event loop,
59//! `asyncio.get_running_loop` will fail when called on a Rust runtime.
60//!
61//! `contextvars` operates in a similar way, though the current context is not always associated
62//! with the current OS thread. Different contexts can be associated with different coroutines even
63//! if they run on the same OS thread.
64//!
65//! ### The Solution
66//!
67//! A really straightforward way of dealing with this problem is to pass references to the
68//! associated Python event loop and context for every conversion. That's why we have a structure
69//! called `TaskLocals` and a set of conversions that accept it.
70//!
71//! `TaskLocals` stores the current event loop, and allows the user to copy the current Python
72//! context if necessary. The following conversions will use these references to perform the
73//! necessary conversions and restore Python context when needed:
74//!
75//! - `pyo3_async_runtimes::into_future_with_locals` - Convert a Python awaitable into a Rust future.
76//! - `pyo3_async_runtimes::<runtime>::future_into_py_with_locals` - Convert a Rust future into a Python
77//!   awaitable.
78//! - `pyo3_async_runtimes::<runtime>::local_future_into_py_with_locals` - Convert a `!Send` Rust future
79//!   into a Python awaitable.
80//!
81//! One clear disadvantage to this approach is that the Rust application has to explicitly track
82//! these references. In native libraries, we can't make any assumptions about the underlying event
83//! loop, so the only reliable way to make sure our conversions work properly is to store these
84//! references at the callsite to use later on.
85//!
86//! ```rust
87//! use pyo3::{wrap_pyfunction, prelude::*};
88//!
89//! # #[cfg(feature = "tokio-runtime")]
90//! #[pyfunction]
91//! fn sleep(py: Python) -> PyResult<Bound<PyAny>> {
92//!     // Construct the task locals structure with the current running loop and context
93//!     let locals = pyo3_async_runtimes::TaskLocals::with_running_loop(py)?.copy_context(py)?;
94//!
95//!     // Convert the async move { } block to a Python awaitable
96//!     pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone(), async move {
97//!         let py_sleep = Python::attach(|py| {
98//!             // Sometimes we need to call other async Python functions within
99//!             // this future. In order for this to work, we need to track the
100//!             // event loop from earlier.
101//!             pyo3_async_runtimes::into_future_with_locals(
102//!                 &locals,
103//!                 py.import("asyncio")?.call_method1("sleep", (1,))?
104//!             )
105//!         })?;
106//!
107//!         py_sleep.await?;
108//!
109//!         Ok(())
110//!     })
111//! }
112//!
113//! # #[cfg(feature = "tokio-runtime")]
114//! #[pymodule]
115//! fn my_mod(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
116//!     m.add_function(wrap_pyfunction!(sleep, m)?)?;
117//!     Ok(())
118//! }
119//! ```
120//!
121//! > A naive solution to this tracking problem would be to cache a global reference to the asyncio
122//! > event loop that all PyO3 Asyncio conversions can use. In fact this is what we did in PyO3
123//! > Asyncio `v0.13`. This works well for applications, but it soon became clear that this is not
124//! > so ideal for libraries. Libraries usually have no direct control over how the event loop is
125//! > managed, they're just expected to work with any event loop at any point in the application.
126//! > This problem is compounded further when multiple event loops are used in the application since
127//! > the global reference will only point to one.
128//!
129//! Another disadvantage to this explicit approach that is less obvious is that we can no longer
130//! call our `#[pyfunction] fn sleep` on a Rust runtime since `asyncio.get_running_loop` only works
131//! on Python threads! It's clear that we need a slightly more flexible approach.
132//!
133//! In order to detect the Python event loop at the callsite, we need something like
134//! `asyncio.get_running_loop` and `contextvars.copy_context` that works for _both Python and Rust_.
135//! In Python, `asyncio.get_running_loop` uses thread-local data to retrieve the event loop
136//! associated with the current thread. What we need in Rust is something that can retrieve the
137//! Python event loop and contextvars associated with the current Rust _task_.
138//!
139//! Enter `pyo3_async_runtimes::<runtime>::get_current_locals`. This function first checks task-local data
140//! for the `TaskLocals`, then falls back on `asyncio.get_running_loop` and
141//! `contextvars.copy_context` if no task locals are found. This way both bases are
142//! covered.
143//!
144//! Now, all we need is a way to store the `TaskLocals` for the Rust future. Since this is a
145//! runtime-specific feature, you can find the following functions in each runtime module:
146//!
147//! - `pyo3_async_runtimes::<runtime>::scope` - Store the task-local data when executing the given Future.
148//! - `pyo3_async_runtimes::<runtime>::scope_local` - Store the task-local data when executing the given
149//!   `!Send` Future.
150//!
151//! With these new functions, we can make our previous example more correct:
152//!
153//! ```rust no_run
154//! use pyo3::prelude::*;
155//!
156//! # #[cfg(feature = "tokio-runtime")]
157//! #[pyfunction]
158//! fn sleep(py: Python) -> PyResult<Bound<PyAny>> {
159//!     // get the current event loop through task-local data
160//!     // OR `asyncio.get_running_loop` and `contextvars.copy_context`
161//!     let locals = pyo3_async_runtimes::tokio::get_current_locals(py)?;
162//!
163//!     pyo3_async_runtimes::tokio::future_into_py_with_locals(
164//!         py,
165//!         locals.clone(),
166//!         // Store the current locals in task-local data
167//!         pyo3_async_runtimes::tokio::scope(locals.clone(), async move {
168//!             let py_sleep = Python::attach(|py| {
169//!                 pyo3_async_runtimes::into_future_with_locals(
170//!                     // Now we can get the current locals through task-local data
171//!                     &pyo3_async_runtimes::tokio::get_current_locals(py)?,
172//!                     py.import("asyncio")?.call_method1("sleep", (1,))?
173//!                 )
174//!             })?;
175//!
176//!             py_sleep.await?;
177//!
178//!             Ok(Python::attach(|py| py.None()))
179//!         })
180//!     )
181//! }
182//!
183//! # #[cfg(feature = "tokio-runtime")]
184//! #[pyfunction]
185//! fn wrap_sleep(py: Python) -> PyResult<Bound<PyAny>> {
186//!     // get the current event loop through task-local data
187//!     // OR `asyncio.get_running_loop` and `contextvars.copy_context`
188//!     let locals = pyo3_async_runtimes::tokio::get_current_locals(py)?;
189//!
190//!     pyo3_async_runtimes::tokio::future_into_py_with_locals(
191//!         py,
192//!         locals.clone(),
193//!         // Store the current locals in task-local data
194//!         pyo3_async_runtimes::tokio::scope(locals.clone(), async move {
195//!             let py_sleep = Python::attach(|py| {
196//!                 pyo3_async_runtimes::into_future_with_locals(
197//!                     &pyo3_async_runtimes::tokio::get_current_locals(py)?,
198//!                     // We can also call sleep within a Rust task since the
199//!                     // locals are stored in task local data
200//!                     sleep(py)?
201//!                 )
202//!             })?;
203//!
204//!             py_sleep.await?;
205//!
206//!             Ok(Python::attach(|py| py.None()))
207//!         })
208//!     )
209//! }
210//!
211//! # #[cfg(feature = "tokio-runtime")]
212//! #[pymodule]
213//! fn my_mod(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
214//!     m.add_function(wrap_pyfunction!(sleep, m)?)?;
215//!     m.add_function(wrap_pyfunction!(wrap_sleep, m)?)?;
216//!     Ok(())
217//! }
218//! ```
219//!
220//! Even though this is more correct, it's clearly not more ergonomic. That's why we introduced a
221//! set of functions with this functionality baked in:
222//!
223//! - `pyo3_async_runtimes::<runtime>::into_future`
224//!   > Convert a Python awaitable into a Rust future (using
225//!   > `pyo3_async_runtimes::<runtime>::get_current_locals`)
226//! - `pyo3_async_runtimes::<runtime>::future_into_py`
227//!   > Convert a Rust future into a Python awaitable (using
228//!   > `pyo3_async_runtimes::<runtime>::get_current_locals` and `pyo3_async_runtimes::<runtime>::scope` to set the
229//!   > task-local event loop for the given Rust future)
230//! - `pyo3_async_runtimes::<runtime>::local_future_into_py`
231//!   > Convert a `!Send` Rust future into a Python awaitable (using
232//!   > `pyo3_async_runtimes::<runtime>::get_current_locals` and `pyo3_async_runtimes::<runtime>::scope_local` to
233//!   > set the task-local event loop for the given Rust future).
234//!
235//! __These are the functions that we recommend using__. With these functions, the previous example
236//! can be rewritten to be more compact:
237//!
238//! ```rust
239//! use pyo3::prelude::*;
240//!
241//! # #[cfg(feature = "tokio-runtime")]
242//! #[pyfunction]
243//! fn sleep(py: Python) -> PyResult<Bound<PyAny>> {
244//!     pyo3_async_runtimes::tokio::future_into_py(py, async move {
245//!         let py_sleep = Python::attach(|py| {
246//!             pyo3_async_runtimes::tokio::into_future(
247//!                 py.import("asyncio")?.call_method1("sleep", (1,))?
248//!             )
249//!         })?;
250//!
251//!         py_sleep.await?;
252//!
253//!         Ok(Python::attach(|py| py.None()))
254//!     })
255//! }
256//!
257//! # #[cfg(feature = "tokio-runtime")]
258//! #[pyfunction]
259//! fn wrap_sleep(py: Python) -> PyResult<Bound<PyAny>> {
260//!     pyo3_async_runtimes::tokio::future_into_py(py, async move {
261//!         let py_sleep = Python::attach(|py| {
262//!             pyo3_async_runtimes::tokio::into_future(sleep(py)?)
263//!         })?;
264//!
265//!         py_sleep.await?;
266//!
267//!         Ok(Python::attach(|py| py.None()))
268//!     })
269//! }
270//!
271//! # #[cfg(feature = "tokio-runtime")]
272//! #[pymodule]
273//! fn my_mod(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
274//!     m.add_function(wrap_pyfunction!(sleep, m)?)?;
275//!     m.add_function(wrap_pyfunction!(wrap_sleep, m)?)?;
276//!     Ok(())
277//! }
278//! ```
279//!
280//! > A special thanks to [@ShadowJonathan](https://github.com/ShadowJonathan) for helping with the
281//! > design and review of these changes!
282//!
283//! ## Rust's Event Loop
284//!
285//! Currently only the Async-Std and Tokio runtimes are supported by this crate. If you need support
286//! for another runtime, feel free to make a request on GitHub (or attempt to add support yourself
287//! with the [`generic`] module)!
288//!
289//! > _In the future, we may implement first class support for more Rust runtimes. Contributions are
290//! > welcome as well!_
291//!
292//! ## Features
293//!
294//! Items marked with
295//! <span
296//!   class="module-item stab portability"
297//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
298//! ><code>attributes</code></span>
299//! > are only available when the `attributes` Cargo feature is enabled:
300//!
301//! ```toml
302//! [dependencies.pyo3-async-runtimes]
303//! version = "0.24"
304//! features = ["attributes"]
305//! ```
306//!
307//! Items marked with
308//! <span
309//!   class="module-item stab portability"
310//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
311//! ><code>async-std-runtime</code></span>
312//! > are only available when the `async-std-runtime` Cargo feature is enabled:
313//!
314//! ```toml
315//! [dependencies.pyo3-async-runtimes]
316//! version = "0.24"
317//! features = ["async-std-runtime"]
318//! ```
319//!
320//! Items marked with
321//! <span
322//!   class="module-item stab portability"
323//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
324//! ><code>tokio-runtime</code></span>
325//! > are only available when the `tokio-runtime` Cargo feature is enabled:
326//!
327//! ```toml
328//! [dependencies.pyo3-async-runtimes]
329//! version = "0.24"
330//! features = ["tokio-runtime"]
331//! ```
332//!
333//! Items marked with
334//! <span
335//!   class="module-item stab portability"
336//!   style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
337//! ><code>testing</code></span>
338//! > are only available when the `testing` Cargo feature is enabled:
339//!
340//! ```toml
341//! [dependencies.pyo3-async-runtimes]
342//! version = "0.24"
343//! features = ["testing"]
344//! ```
345
346/// Re-exported for #[test] attributes
347#[cfg(all(feature = "attributes", feature = "testing"))]
348pub use inventory;
349
350/// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span> Utilities for writing PyO3 Asyncio tests
351#[cfg(feature = "testing")]
352pub mod testing;
353
354#[cfg(feature = "async-std")]
355pub mod async_std;
356
357#[cfg(feature = "tokio-runtime")]
358pub mod tokio;
359
360/// Errors and exceptions related to PyO3 Asyncio
361pub mod err;
362
363pub mod generic;
364
365#[pymodule]
366fn pyo3_async_runtimes(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
367    m.add("RustPanic", py.get_type::<err::RustPanic>())?;
368    Ok(())
369}
370
371/// Test README
372#[doc(hidden)]
373pub mod doc_test {
374    #[allow(unused)]
375    macro_rules! doc_comment {
376        ($x:expr, $module:item) => {
377            #[doc = $x]
378            $module
379        };
380    }
381
382    #[allow(unused)]
383    macro_rules! doctest {
384        ($x:expr, $y:ident) => {
385            doc_comment!(include_str!($x), mod $y {});
386        };
387    }
388
389    #[cfg(all(
390        feature = "async-std-runtime",
391        feature = "tokio-runtime",
392        feature = "attributes"
393    ))]
394    doctest!("../README.md", readme_md);
395}
396
397use std::future::Future;
398use std::sync::Arc;
399
400use futures::channel::oneshot;
401use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict};
402
403static ASYNCIO: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
404static CONTEXTVARS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
405static ENSURE_FUTURE: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
406static GET_RUNNING_LOOP: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
407
408fn ensure_future<'p>(py: Python<'p>, awaitable: &Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
409    ENSURE_FUTURE
410        .get_or_try_init(py, || -> PyResult<Py<PyAny>> {
411            Ok(asyncio(py)?
412                .getattr(pyo3::intern!(py, "ensure_future"))?
413                .into())
414        })?
415        .bind(py)
416        .call1((awaitable,))
417}
418
419fn create_future(event_loop: Bound<'_, PyAny>) -> PyResult<Bound<'_, PyAny>> {
420    event_loop.call_method0(pyo3::intern!(event_loop.py(), "create_future"))
421}
422
423fn close(event_loop: Bound<PyAny>) -> PyResult<()> {
424    let py = event_loop.py();
425    event_loop.call_method1(
426        pyo3::intern!(py, "run_until_complete"),
427        (event_loop.call_method0(pyo3::intern!(py, "shutdown_asyncgens"))?,),
428    )?;
429
430    // how to do this prior to 3.9?
431    if event_loop.hasattr(pyo3::intern!(py, "shutdown_default_executor"))? {
432        event_loop.call_method1(
433            pyo3::intern!(py, "run_until_complete"),
434            (event_loop.call_method0(pyo3::intern!(py, "shutdown_default_executor"))?,),
435        )?;
436    }
437
438    event_loop.call_method0(pyo3::intern!(py, "close"))?;
439
440    Ok(())
441}
442
443fn asyncio(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> {
444    ASYNCIO
445        .get_or_try_init(py, || Ok(py.import("asyncio")?.into()))
446        .map(|asyncio| asyncio.bind(py))
447}
448
449/// Get a reference to the Python Event Loop from Rust
450///
451/// Equivalent to `asyncio.get_running_loop()` in Python 3.7+.
452pub fn get_running_loop(py: Python) -> PyResult<Bound<PyAny>> {
453    // Ideally should call get_running_loop, but calls get_event_loop for compatibility when
454    // get_running_loop is not available.
455    GET_RUNNING_LOOP
456        .get_or_try_init(py, || -> PyResult<Py<PyAny>> {
457            let asyncio = asyncio(py)?;
458
459            Ok(asyncio
460                .getattr(pyo3::intern!(py, "get_running_loop"))?
461                .into())
462        })?
463        .bind(py)
464        .call0()
465}
466
467fn contextvars(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> {
468    Ok(CONTEXTVARS
469        .get_or_try_init(py, || py.import("contextvars").map(|m| m.into()))?
470        .bind(py))
471}
472
473fn copy_context(py: Python) -> PyResult<Bound<PyAny>> {
474    contextvars(py)?.call_method0(pyo3::intern!(py, "copy_context"))
475}
476
477/// Task-local inner structure.
478#[derive(Debug)]
479struct TaskLocalsInner {
480    /// Track the event loop of the Python task
481    event_loop: Py<PyAny>,
482    /// Track the contextvars of the Python task
483    context: Py<PyAny>,
484}
485
486/// Task-local data to store for Python conversions.
487#[derive(Debug)]
488pub struct TaskLocals(Arc<TaskLocalsInner>);
489
490impl TaskLocals {
491    /// At a minimum, TaskLocals must store the event loop.
492    pub fn new(event_loop: Bound<PyAny>) -> Self {
493        Self(Arc::new(TaskLocalsInner {
494            context: event_loop.py().None(),
495            event_loop: event_loop.into(),
496        }))
497    }
498
499    /// Construct TaskLocals with the event loop returned by `get_running_loop`
500    pub fn with_running_loop(py: Python) -> PyResult<Self> {
501        Ok(Self::new(get_running_loop(py)?))
502    }
503
504    /// Manually provide the contextvars for the current task.
505    pub fn with_context(self, context: Bound<PyAny>) -> Self {
506        Self(Arc::new(TaskLocalsInner {
507            event_loop: self.0.event_loop.clone_ref(context.py()),
508            context: context.into(),
509        }))
510    }
511
512    /// Capture the current task's contextvars
513    pub fn copy_context(self, py: Python) -> PyResult<Self> {
514        Ok(self.with_context(copy_context(py)?))
515    }
516
517    /// Get a reference to the event loop
518    pub fn event_loop<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> {
519        self.0.event_loop.clone_ref(py).into_bound(py)
520    }
521
522    /// Get a reference to the python context
523    pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> {
524        self.0.context.clone_ref(py).into_bound(py)
525    }
526
527    /// Create a clone of the TaskLocals. No longer uses the runtime, use `clone` instead.
528    #[deprecated(note = "please use `clone` instead")]
529    pub fn clone_ref(&self, _py: Python<'_>) -> Self {
530        self.clone()
531    }
532}
533
534impl Clone for TaskLocals {
535    /// Create a clone of the TaskLocals by incrementing the reference counter of the inner
536    /// structure.
537    fn clone(&self) -> Self {
538        Self(self.0.clone())
539    }
540}
541
542#[pyclass]
543struct PyTaskCompleter {
544    tx: Option<oneshot::Sender<PyResult<Py<PyAny>>>>,
545}
546
547#[pymethods]
548impl PyTaskCompleter {
549    #[pyo3(signature = (task))]
550    pub fn __call__(&mut self, task: &Bound<PyAny>) -> PyResult<()> {
551        let py = task.py();
552        debug_assert!(task.call_method0(pyo3::intern!(py, "done"))?.extract()?);
553        let result = match task.call_method0(pyo3::intern!(py, "result")) {
554            Ok(val) => Ok(val.into()),
555            Err(e) => Err(e),
556        };
557
558        // unclear to me whether or not this should be a panic or silent error.
559        //
560        // calling PyTaskCompleter twice should not be possible, but I don't think it really hurts
561        // anything if it happens.
562        if let Some(tx) = self.tx.take() {
563            if tx.send(result).is_err() {
564                // cancellation is not an error
565            }
566        }
567
568        Ok(())
569    }
570}
571
572#[pyclass]
573struct PyEnsureFuture {
574    awaitable: Py<PyAny>,
575    tx: Option<oneshot::Sender<PyResult<Py<PyAny>>>>,
576}
577
578#[pymethods]
579impl PyEnsureFuture {
580    pub fn __call__(&mut self) -> PyResult<()> {
581        Python::attach(|py| {
582            let task = ensure_future(py, self.awaitable.bind(py))?;
583            let on_complete = PyTaskCompleter { tx: self.tx.take() };
584            task.call_method1(pyo3::intern!(py, "add_done_callback"), (on_complete,))?;
585
586            Ok(())
587        })
588    }
589}
590
591fn call_soon_threadsafe<'py>(
592    event_loop: &Bound<'py, PyAny>,
593    context: &Bound<PyAny>,
594    args: impl PyCallArgs<'py>,
595) -> PyResult<()> {
596    let py = event_loop.py();
597
598    let kwargs = PyDict::new(py);
599    kwargs.set_item(pyo3::intern!(py, "context"), context)?;
600
601    event_loop.call_method(
602        pyo3::intern!(py, "call_soon_threadsafe"),
603        args,
604        Some(&kwargs),
605    )?;
606    Ok(())
607}
608
609/// Convert a Python `awaitable` into a Rust Future
610///
611/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
612/// completion handler sends the result of this Task through a
613/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
614/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
615///
616/// # Arguments
617/// * `locals` - The Python event loop and context to be used for the provided awaitable
618/// * `awaitable` - The Python `awaitable` to be converted
619///
620/// # Examples
621///
622/// ```
623/// use std::time::Duration;
624/// use std::ffi::CString;
625///
626/// use pyo3::prelude::*;
627///
628/// const PYTHON_CODE: &'static str = r#"
629/// import asyncio
630///
631/// async def py_sleep(duration):
632///     await asyncio.sleep(duration)
633/// "#;
634///
635/// # #[cfg(feature = "tokio-runtime")]
636/// async fn py_sleep(seconds: f32) -> PyResult<()> {
637///     let test_mod = Python::attach(|py| -> PyResult<Py<PyAny>> {
638///         Ok(
639///             PyModule::from_code(
640///                 py,
641///                 &CString::new(PYTHON_CODE).unwrap(),
642///                 &CString::new("test_into_future/test_mod.py").unwrap(),
643///                 &CString::new("test_mod").unwrap(),
644///             )?
645///             .into()
646///         )
647///     })?;
648///
649///     Python::attach(|py| {
650///         pyo3_async_runtimes::into_future_with_locals(
651///             &pyo3_async_runtimes::tokio::get_current_locals(py)?,
652///             test_mod
653///                 .call_method1(py, "py_sleep", (seconds,))?
654///                 .into_bound(py),
655///         )
656///     })?
657///     .await?;
658///     Ok(())
659/// }
660/// ```
661pub fn into_future_with_locals(
662    locals: &TaskLocals,
663    awaitable: Bound<PyAny>,
664) -> PyResult<impl Future<Output = PyResult<Py<PyAny>>> + Send> {
665    let py = awaitable.py();
666    let (tx, rx) = oneshot::channel();
667
668    call_soon_threadsafe(
669        &locals.event_loop(py),
670        &locals.context(py),
671        (PyEnsureFuture {
672            awaitable: awaitable.into(),
673            tx: Some(tx),
674        },),
675    )?;
676
677    Ok(async move {
678        match rx.await {
679            Ok(item) => item,
680            Err(_) => Python::attach(|py| {
681                Err(PyErr::from_value(
682                    asyncio(py)?.call_method0(pyo3::intern!(py, "CancelledError"))?,
683                ))
684            }),
685        }
686    })
687}
688
689fn dump_err(py: Python<'_>) -> impl FnOnce(PyErr) + '_ {
690    move |e| {
691        // We can't display Python exceptions via std::fmt::Display,
692        // so print the error here manually.
693        e.print_and_set_sys_last_vars(py);
694    }
695}