lance_core/utils/
tokio.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::atomic::Ordering;
5use std::sync::{atomic, LazyLock};
6use std::time::Duration;
7
8use crate::Result;
9
10use futures::{Future, FutureExt};
11use tokio::runtime::{Builder, Runtime};
12use tracing::Span;
13
14/// We cache the call to num_cpus::get() because:
15///
16/// 1. It shouldn't change during the lifetime of the program
17/// 2. It's a relatively expensive call (requires opening several files and examining them)
18static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
19    LazyLock::new(calculate_num_compute_intensive_cpus);
20
21pub fn get_num_compute_intensive_cpus() -> usize {
22    *NUM_COMPUTE_INTENSIVE_CPUS
23}
24
25fn calculate_num_compute_intensive_cpus() -> usize {
26    if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
27        return user_specified.parse().unwrap();
28    }
29
30    let cpus = num_cpus::get();
31
32    if cpus <= *IO_CORE_RESERVATION {
33        // If the user is not setting a custom value for LANCE_IO_CORE_RESERVATION then we don't emit
34        // a warning because they're just on a small machine and there isn't much they can do about it.
35        if cpus > 2 {
36            log::warn!(
37                "Number of CPUs is less than or equal to the number of IO core reservations. \
38                This is not a supported configuration. using 1 CPU for compute intensive tasks."
39            );
40        }
41        return 1;
42    }
43
44    num_cpus::get() - *IO_CORE_RESERVATION
45}
46
47pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
48    std::env::var("LANCE_IO_CORE_RESERVATION")
49        .unwrap_or("2".to_string())
50        .parse()
51        .unwrap()
52});
53
54fn create_runtime() -> Runtime {
55    Builder::new_multi_thread()
56        .thread_name("lance-cpu")
57        .max_blocking_threads(get_num_compute_intensive_cpus())
58        .worker_threads(1)
59        // keep the thread alive "forever"
60        .thread_keep_alive(Duration::from_secs(u64::MAX))
61        .build()
62        .unwrap()
63}
64
65static CPU_RUNTIME: atomic::AtomicPtr<Runtime> = atomic::AtomicPtr::new(std::ptr::null_mut());
66
67static RUNTIME_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
68
69static ATFORK_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
70
71fn global_cpu_runtime() -> &'static mut Runtime {
72    loop {
73        let ptr = CPU_RUNTIME.load(Ordering::SeqCst);
74        if !ptr.is_null() {
75            return unsafe { &mut *ptr };
76        }
77        if !RUNTIME_INSTALLED.fetch_or(true, Ordering::SeqCst) {
78            break;
79        }
80        std::thread::yield_now();
81    }
82    if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
83        install_atfork();
84    }
85    let new_ptr = Box::into_raw(Box::new(create_runtime()));
86    CPU_RUNTIME.store(new_ptr, Ordering::SeqCst);
87    unsafe { &mut *new_ptr }
88}
89
90/// After a fork() operation, force re-creation of the BackgroundExecutor. Note: this function
91/// runs in "async-signal context" which means that we can't (safely) do much here.
92extern "C" fn atfork_tokio_child() {
93    CPU_RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
94    RUNTIME_INSTALLED.store(false, Ordering::SeqCst);
95}
96
97#[cfg(not(windows))]
98fn install_atfork() {
99    unsafe { libc::pthread_atfork(None, None, Some(atfork_tokio_child)) };
100}
101
102#[cfg(windows)]
103fn install_atfork() {}
104
105/// Spawn a CPU intensive task
106///
107/// This task will be put onto a thread pool dedicated for CPU-intensive work
108/// This keeps the tokio thread pool free so that we can always be ready to service
109/// cheap I/O & control requests.
110///
111/// This can also be used to convert a big chunk of synchronous work into a future
112/// so that it can be run in parallel with something like StreamExt::buffered()
113pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
114    func: F,
115) -> impl Future<Output = Result<R>> {
116    let (send, recv) = tokio::sync::oneshot::channel();
117    // Propagate the current span into the task
118    let span = Span::current();
119    global_cpu_runtime().spawn_blocking(move || {
120        let _span_guard = span.enter();
121        let result = func();
122        let _ = send.send(result);
123    });
124    recv.map(|res| res.unwrap())
125}