paimon_datafusion/runtime.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::OnceLock;
20
21use tokio::runtime::{Handle, Runtime};
22
23static RUNTIME: OnceLock<Runtime> = OnceLock::new();
24
25fn global_runtime() -> &'static Runtime {
26 RUNTIME.get_or_init(|| {
27 Runtime::new()
28 .expect("failed to build global tokio runtime for paimon datafusion integration")
29 })
30}
31
32/// Returns a [`Handle`] to the global Tokio runtime.
33///
34/// If a Tokio runtime is already entered on the current thread, its handle is
35/// returned directly. Otherwise a lazily-initialised global runtime is used.
36pub fn runtime() -> Handle {
37 match Handle::try_current() {
38 Ok(h) => h,
39 _ => global_runtime().handle().clone(),
40 }
41}
42
43// These helpers work around DataFusion FFI callbacks that may run without an
44// entered Tokio runtime. See https://github.com/apache/datafusion/issues/16312.
45// A global OnceLock<Runtime> avoids creating a new runtime on every call;
46// if DataFusion fixes runtime propagation end-to-end, we should be able to
47// remove these manual fallback runtimes.
48pub(crate) async fn await_with_runtime<F>(future: F) -> F::Output
49where
50 F: Future,
51{
52 if Handle::try_current().is_ok() {
53 future.await
54 } else {
55 global_runtime().block_on(future)
56 }
57}
58
59// The blocking variant is for synchronous DataFusion FFI callbacks such as
60// CatalogProvider::schema(), where we cannot `.await` directly.
61pub(crate) fn block_on_with_runtime<F>(future: F, panic_error: &'static str) -> F::Output
62where
63 F: Future + Send + 'static,
64 F::Output: Send + 'static,
65{
66 if Handle::try_current().is_ok() {
67 let handle = global_runtime().handle().clone();
68 std::thread::spawn(move || handle.block_on(future))
69 .join()
70 .expect(panic_error)
71 } else {
72 global_runtime().block_on(future)
73 }
74}