Skip to main content

paimon_datafusion/
runtime.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::OnceLock;
20
21use tokio::runtime::{Handle, Runtime};
22
23static RUNTIME: OnceLock<Runtime> = OnceLock::new();
24
25fn global_runtime() -> &'static Runtime {
26    RUNTIME.get_or_init(|| {
27        Runtime::new()
28            .expect("failed to build global tokio runtime for paimon datafusion integration")
29    })
30}
31
32/// Returns a [`Handle`] to the global Tokio runtime.
33///
34/// If a Tokio runtime is already entered on the current thread, its handle is
35/// returned directly. Otherwise a lazily-initialised global runtime is used.
36pub fn runtime() -> Handle {
37    match Handle::try_current() {
38        Ok(h) => h,
39        _ => global_runtime().handle().clone(),
40    }
41}
42
43// These helpers work around DataFusion FFI callbacks that may run without an
44// entered Tokio runtime. See https://github.com/apache/datafusion/issues/16312.
45// A global OnceLock<Runtime> avoids creating a new runtime on every call;
46// if DataFusion fixes runtime propagation end-to-end, we should be able to
47// remove these manual fallback runtimes.
48pub(crate) async fn await_with_runtime<F>(future: F) -> F::Output
49where
50    F: Future,
51{
52    if Handle::try_current().is_ok() {
53        future.await
54    } else {
55        global_runtime().block_on(future)
56    }
57}
58
59// The blocking variant is for synchronous DataFusion FFI callbacks such as
60// CatalogProvider::schema(), where we cannot `.await` directly.
61pub(crate) fn block_on_with_runtime<F>(future: F, panic_error: &'static str) -> F::Output
62where
63    F: Future + Send + 'static,
64    F::Output: Send + 'static,
65{
66    if Handle::try_current().is_ok() {
67        let handle = global_runtime().handle().clone();
68        std::thread::spawn(move || handle.block_on(future))
69            .join()
70            .expect(panic_error)
71    } else {
72        global_runtime().block_on(future)
73    }
74}