datafusion_table_providers/sql/db_connection_pool/
runtime.rs

1// If calling directly from Rust, there is already tokio runtime so no
2// additional work is needed. If calling from Python FFI, there's no existing
3// tokio runtime, so we need to start a new one.
4use std::{future::Future, sync::OnceLock};
5
6use tokio::runtime::Handle;
7
8pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
9
10#[inline]
11pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
12    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
13    RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
14}
15
16pub fn execute_in_tokio<F, Fut, T>(f: F) -> T
17where
18    F: FnOnce() -> Fut,
19    Fut: Future<Output = T>,
20{
21    get_tokio_runtime().0.block_on(f())
22}
23
24pub async fn run_async_with_tokio<F, Fut, T, E>(f: F) -> Result<T, E>
25where
26    F: FnOnce() -> Fut,
27    Fut: Future<Output = Result<T, E>>,
28{
29    match Handle::try_current() {
30        Ok(_) => f().await,
31        Err(_) => execute_in_tokio(f),
32    }
33}
34
35pub fn run_sync_with_tokio<T, E>(f: impl FnOnce() -> Result<T, E>) -> Result<T, E> {
36    match Handle::try_current() {
37        Ok(_) => f(),
38        Err(_) => execute_in_tokio(|| async { f() }),
39    }
40}