1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
use std::{future::Future, thread}; use ::tokio::{ runtime::{Builder, Runtime}, task, }; use futures::future::pending; use once_cell::sync::OnceCell; use pyo3::prelude::*; use crate::generic; /// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> /// re-exports for macros #[cfg(feature = "attributes")] pub mod re_exports { /// re-export pending to be used in tokio macros without additional dependency pub use futures::future::pending; /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency pub use tokio::runtime; } /// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> #[cfg(feature = "attributes")] pub use pyo3_asyncio_macros::tokio_main as main; /// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>attributes</code></span> /// <span class="module-item stab portability" style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"><code>testing</code></span> /// Registers a `tokio` test with the `pyo3-asyncio` test harness #[cfg(all(feature = "attributes", feature = "testing"))] pub use pyo3_asyncio_macros::tokio_test as test; static TOKIO_RUNTIME: OnceCell<Runtime> = OnceCell::new(); const EXPECT_TOKIO_INIT: &str = "Tokio runtime must be initialized"; impl generic::JoinError for task::JoinError { fn is_panic(&self) -> bool { task::JoinError::is_panic(self) } } struct TokioRuntime; impl generic::Runtime for TokioRuntime { type JoinError = task::JoinError; type JoinHandle = task::JoinHandle<()>; fn spawn<F>(fut: F) -> Self::JoinHandle where F: Future<Output = ()> + Send + 'static, { get_runtime().spawn(async move { fut.await; }) } } /// Initialize the Tokio Runtime with a custom build pub fn init(runtime: Runtime) { TOKIO_RUNTIME .set(runtime) .expect("Tokio Runtime has already been initialized"); } /// Initialize the Tokio Runtime with current-thread scheduler pub fn init_current_thread() { init( Builder::new_current_thread() .enable_all() .build() .expect("Couldn't build the current-thread Tokio runtime"), ); thread::spawn(|| { get_runtime().block_on(pending::<()>()); }); } /// Get a reference to the current tokio runtime pub fn get_runtime<'a>() -> &'a Runtime { TOKIO_RUNTIME.get().expect(EXPECT_TOKIO_INIT) } /// Initialize the Tokio Runtime with the multi-thread scheduler pub fn init_multi_thread() { init( Builder::new_multi_thread() .enable_all() .build() .expect("Couldn't build the multi-thread Tokio runtime"), ); } /// Run the event loop until the given Future completes /// /// The event loop runs until the given future is complete. /// /// After this function returns, the event loop can be resumed with either [`run_until_complete`] or /// [`crate::run_forever`] /// /// # Arguments /// * `py` - The current PyO3 GIL guard /// * `fut` - The future to drive to completion /// /// # Examples /// /// ``` /// # use std::time::Duration; /// # /// # use pyo3::prelude::*; /// # use tokio::runtime::{Builder, Runtime}; /// # /// # let runtime = Builder::new_current_thread() /// # .enable_all() /// # .build() /// # .expect("Couldn't build the runtime"); /// # /// # Python::with_gil(|py| { /// # pyo3_asyncio::with_runtime(py, || { /// # pyo3_asyncio::tokio::init_current_thread(); /// pyo3_asyncio::tokio::run_until_complete(py, async move { /// tokio::time::sleep(Duration::from_secs(1)).await; /// Ok(()) /// })?; /// # Ok(()) /// # }) /// # .map_err(|e| { /// # e.print_and_set_sys_last_vars(py); /// # }) /// # .unwrap(); /// # }); /// ``` pub fn run_until_complete<F>(py: Python, fut: F) -> PyResult<()> where F: Future<Output = PyResult<()>> + Send + 'static, { generic::run_until_complete::<TokioRuntime, _>(py, fut) } /// Convert a Rust Future into a Python coroutine /// /// # Arguments /// * `py` - The current PyO3 GIL guard /// * `fut` - The Rust future to be converted /// /// # Examples /// /// ``` /// use std::time::Duration; /// /// use pyo3::prelude::*; /// /// /// Awaitable sleep function /// #[pyfunction] /// fn sleep_for(py: Python, secs: &PyAny) -> PyResult<PyObject> { /// let secs = secs.extract()?; /// /// pyo3_asyncio::tokio::into_coroutine(py, async move { /// tokio::time::sleep(Duration::from_secs(secs)).await; /// Python::with_gil(|py| Ok(py.None())) /// }) /// } /// ``` pub fn into_coroutine<F>(py: Python, fut: F) -> PyResult<PyObject> where F: Future<Output = PyResult<PyObject>> + Send + 'static, { generic::into_coroutine::<TokioRuntime, _>(py, fut) }